public inbox for [email protected]  
help / color / mirror / Atom feed
From: Bharath Rupireddy <[email protected]>
To: PostgreSQL-development <[email protected]>
To: Andres Freund <[email protected]>
To: Luc Vlaming <[email protected]>
Subject: New Table Access Methods for Multi and Single Inserts
Date: Tue, 8 Dec 2020 18:27:37 +0530
Message-ID: <CALj2ACVi9eTRYR=gdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ@mail.gmail.com> (raw)

Hi,

Currently, for any component (such as COPY, CTAS[1], CREATE/REFRESH
Mat View[1], INSERT INTO SELECTs[2]) multi insert logic such as buffer
slots allocation, maintenance, decision to flush and clean up, need to
be implemented outside the table_multi_insert() API. The main problem
is that it fails to take into consideration the underlying storage
engine capabilities, for more details of this point refer to a
discussion in multi inserts in CTAS thread[1]. This also creates a lot
of duplicate code which is more error prone and not maintainable.

More importantly, in another thread [3] @Andres Freund suggested to
have table insert APIs in such a way that they look more like 'scan'
APIs i.e. insert_begin, insert, insert_end. The main advantages doing
this are(quoting from his statement in [3]) - "more importantly it'd
allow an AM to optimize operations across multiple inserts, which is
important for column stores."

I propose to introduce new table access methods for both multi and
single inserts based on the prototype suggested by Andres in [3]. Main
design goal of these new APIs is to give flexibility to tableam
developers in implementing multi insert logic dependent on the
underlying storage engine.

Below are the APIs. I suggest to have a look at
v1-0001-New-Table-AMs-for-Multi-and-Single-Inserts.patch for details
of the new data structure and the API functionality. Note that
temporarily I used XX_v2, we can change it later.

TableInsertState* table_insert_begin(initial_args);
void table_insert_v2(TableInsertState *state, TupleTableSlot *slot);
void table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot);
void table_multi_insert_flush(TableInsertState *state);
void table_insert_end(TableInsertState *state);

I'm attaching a few patches(just to show that these APIs work, avoids
a lot of duplicate code and makes life easier). Better commenting can
be added later. If these APIs and patches look okay, we can even
consider replacing them in other places such as nodeModifyTable.c and
so on.

v1-0001-New-Table-AMs-for-Multi-and-Single-Inserts.patch --->
introduces new table access methods for multi and single inserts. Also
implements/rearranges the outside code for heap am into these new
APIs.
v1-0002-CTAS-and-REFRESH-Mat-View-With-New-Multi-Insert-Table-AM.patch
---> adds new multi insert table access methods to CREATE TABLE AS,
CREATE MATERIALIZED VIEW and REFRESH MATERIALIZED VIEW.
v1-0003-ATRewriteTable-With-New-Single-Insert-Table-AM.patch ---> adds
new single insert table access method to ALTER TABLE rewrite table
code.
v1-0004-COPY-With-New-Multi-and-Single-Insert-Table-AM.patch ---> adds
new single and multi insert table access method to COPY code.

Thoughts?

Many thanks to Robert, Vignesh and Dilip for offlist discussion.

[1] - https://www.postgresql.org/message-id/4eee0730-f6ec-e72d-3477-561643f4b327%40swarm64.com
[2] - https://www.postgresql.org/message-id/20201124020020.GK24052%40telsasoft.com
[3] - https://www.postgresql.org/message-id/20200924024128.kyk3r5g7dnu3fxxx%40alap3.anarazel.de

With Regards,
Bharath Rupireddy.
EnterpriseDB: http://www.enterprisedb.com


Attachments:

  [application/octet-stream] v1-0001-New-Table-AMs-for-Multi-and-Single-Inserts.patch (20.4K, 2-v1-0001-New-Table-AMs-for-Multi-and-Single-Inserts.patch)
  download | inline diff:
From 8dd54a131d27651f36587a784e0eedadfdf4ee5a Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Tue, 8 Dec 2020 12:02:31 +0530
Subject: [PATCH v1] New Table AMs for Multi and Single Inserts

This patch introduces new table access methods for multi and
single inserts. Also implements/rearranges the outside code for
heap am into these new APIs.

Main design goal of these new APIs is to give flexibility to
tableam developers in implementing multi insert logic dependent on
the underlying storage engine. Currently, for all the underlying
storage engines, we follow the same multi insert logic such as when
and how to flush the buffered tuples, tuple size calculation, and
this logic doesn't take into account the underlying storage engine
capabilities.

We can also avoid duplicating multi insert code (for existing COPY,
and upcoming CTAS, CREATE/REFRESH MAT VIEW and INSERT SELECTs). We
can also move bulk insert state allocation and deallocation inside
these APIs.
---
 src/backend/access/heap/heapam.c         | 245 +++++++++++++++++++++++
 src/backend/access/heap/heapam_handler.c |   5 +
 src/backend/access/table/tableamapi.c    |   7 +
 src/backend/executor/execTuples.c        |  83 +++++++-
 src/include/access/heapam.h              |  11 +
 src/include/access/tableam.h             | 120 +++++++++++
 src/include/executor/tuptable.h          |   1 +
 7 files changed, 471 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a9583f3103..48c93ccce7 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -66,6 +66,7 @@
 #include "utils/datum.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/spccache.h"
@@ -2371,6 +2372,250 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 	pgstat_count_heap_insert(relation, ntuples);
 }
 
+/*
+ * Allocate and initialize TableInsertState.
+ *
+ * If alloc_bistate is true, then bulk insert state is allocated, otherwise not
+ *
+ * For single inserts:
+ *  1) Specify alloc_bistate as false, then multi insert state is NULL.
+ *  2) mi_max_slots and mi_max_size are ignored, but good to specify negative
+ * 	   values.
+ *
+ * For multi inserts:
+ *  1) Specify is_multi as true, then multi insert state is allcoated.
+ *  2) Specify mi_max_slots > 0 i.e. the number of slots to buffer.
+ * 	   mi_max_slots <= 0 is invalid.
+ *  3) Specify mi_max_size > 0 i.e. the total tuple size (in bytes) the
+ * 	   buffered slots can hold until flush.
+ *  4) Flush the buffers either if all the mi_max_slots are filled or if the
+ * 	   total tuple size of the so far buffered slots is >= mi_max_size. If
+ *	   mi_max_size <= 0, then flush the buffers only when all the mi_max_slots
+ *	   are filled.
+ *
+ *  Other input parameters i.e. relation, command id, options are common for
+ *  both single and multi inserts.
+ */
+TableInsertState* heap_insert_begin(Relation rel, CommandId cid, int options,
+									bool alloc_bistate, bool is_multi,
+									int32 mi_max_slots, int64 mi_max_size)
+{
+	TableInsertState *state = NULL;
+
+	state = palloc0(sizeof(TableInsertState));
+
+	state->rel = rel;
+	state->cid = cid;
+	state->options = options;
+
+	if (alloc_bistate)
+		state->bistate = GetBulkInsertState();
+	else
+		state->bistate = NULL;
+
+	if (is_multi)
+	{
+		if (mi_max_slots > 0)
+		{
+			state->mistate = palloc0(sizeof(TableMultiInsertState));
+			state->mistate->slots =
+							palloc0(sizeof(TupleTableSlot *) * mi_max_slots);
+			state->mistate->max_slots = mi_max_slots;
+			state->mistate->max_size = mi_max_size;
+			state->mistate->cur_slots = 0;
+			state->mistate->cur_size = 0;
+			state->mistate->cur_tup_size = -1;
+			state->mistate->clear_slots = true;
+			state->mistate->flushed	= false;
+
+			/*
+			 * Create a temporary memory context so that we can reset once per
+			 * multi insert batch.
+			 */
+			state->mistate->context =
+							AllocSetContextCreate(CurrentMemoryContext,
+												  "heap_multi_insert",
+												  ALLOCSET_DEFAULT_SIZES);
+		}
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("invalid number of slots specified for heap multi inserts")));
+	}
+	else
+		state->mistate = NULL;
+
+	return state;
+}
+
+/* Insert a tuple from a slot into table AM routine. */
+void heap_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	bool		shouldFree = true;
+	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+
+	/* Update the tuple with table oid */
+	slot->tts_tableOid = RelationGetRelid(state->rel);
+	tuple->t_tableOid = slot->tts_tableOid;
+
+	/* Perform the insertion, and copy the resulting ItemPointer */
+	heap_insert(state->rel, tuple, state->cid, state->options, state->bistate);
+	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
+
+	if (shouldFree)
+		pfree(tuple);
+}
+
+/*
+ * Buffer the input slots and insert the tuples from the buffered slots at a
+ * time into a table.
+ *
+ * Compute the size of the tuple only if mi_max_size i.e. the total tuple size
+ * (in bytes) the buffered slots can hold until flush is specified and the
+ * current tuple size i.e. cur_tup_size is not known.
+ *
+ * Flush the tuples from the buffered slots into the table:
+ *  1) either if all the slots are filled up
+ *  2) or if the mi_max_size is specified and the total tuple size of the
+ * 	   currently buffered slots are >= mi_max_size.
+ */
+void heap_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	TupleTableSlot  *batchslot;
+
+	Assert(state->mistate != NULL);
+	Assert(state->mistate->slots != NULL);
+
+	if (state->mistate->slots[state->mistate->cur_slots] == NULL)
+		state->mistate->slots[state->mistate->cur_slots] =
+										table_slot_create(state->rel, NULL);
+
+	batchslot = state->mistate->slots[state->mistate->cur_slots];
+
+	ExecCopySlot(batchslot, slot);
+
+	/* Reset the flush state if previously set. */
+	if (state->mistate->flushed)
+		state->mistate->flushed = false;
+
+	/* Compute the tuple size only if asked to do so. */
+	if (state->mistate->max_size > 0 && state->mistate->cur_tup_size <= 0)
+	{
+		/* We are here when the tuple size is not known in the caller. */
+		Size sz;
+
+		/*
+		 * Calculate the tuple size after the original slot is copied. Because
+		 * the copied slot type and the tuple size may change.
+		 */
+		sz = GetTupleSize(batchslot, state->mistate->max_size);
+
+		state->mistate->cur_slots++;
+		state->mistate->cur_size += sz;
+
+	}
+	else if (state->mistate->max_size > 0 && state->mistate->cur_tup_size > 0)
+	{
+		/* Tuple size is known in the caller, just use and reset it. */
+		state->mistate->cur_slots++;
+		state->mistate->cur_size += state->mistate->cur_tup_size;
+		state->mistate->cur_tup_size = -1;
+	}
+
+	if (state->mistate->cur_slots >= state->mistate->max_slots ||
+		state->mistate->cur_size >= state->mistate->max_size)
+	{
+		heap_multi_insert_flush(state);
+	}
+}
+
+/*
+ * Flush the tuples from buffered slots if any.
+ *
+ * This function can be useful in cases, where one of the partition can not use
+ * multi inserts but others can and they have buffered few slots so far, which
+ * need to be flushed for visibility, before the partition that doesn't support
+ * can proceed with single inserts.
+ */
+void heap_multi_insert_flush(TableInsertState *state)
+{
+	Assert(state->mistate != NULL);
+
+	if (state->mistate->cur_slots > 0)
+	{
+		MemoryContext oldcontext;
+
+		oldcontext = MemoryContextSwitchTo(state->mistate->context);
+
+		heap_multi_insert(state->rel, state->mistate->slots,
+						state->mistate->cur_slots, state->cid,
+						state->options, state->bistate);
+
+		MemoryContextReset(state->mistate->context);
+		MemoryContextSwitchTo(oldcontext);
+
+		/*
+		 * Do not clear the slots always. Sometimes callers may want the slots
+		 * for index insertions or after row trigger executions in which case
+		 * they have to clear the tuples before using for the next insert
+		 * batch.
+		 */
+		if (state->mistate->clear_slots)
+		{
+			int i;
+
+			for (i = 0; i < state->mistate->cur_slots; i++)
+				ExecClearTuple(state->mistate->slots[i]);
+		}
+
+		state->mistate->cur_slots = 0;
+		state->mistate->cur_size = 0;
+		state->mistate->cur_tup_size = -1;
+		state->mistate->flushed = true;
+	}
+	else
+		state->mistate->flushed = false;
+}
+
+/*
+ * Clean up the TableInsertState.
+ *
+ * For multi inserts, ensure to flush all the remaining buffers with
+ * heap_multi_insert_flush before calling this function. Buffered slots are
+ * dropped, short-lived memory context is delted and mistate is freed up.
+ *
+ * Free up the bulk insert state using FreeBulkInsertState() if exists.
+ *
+ * Free up TableInsertState.
+ */
+void heap_insert_end(TableInsertState *state)
+{
+	if (state->mistate)
+	{
+		int i;
+
+		/*
+		 * Ensure to flush all the remaining buffers with
+		 * heap_multi_insert_flush before calling heap_insert_end.
+		 */
+		Assert(state->mistate->cur_slots == 0);
+
+		for (i = 0; i < state->mistate->max_slots && state->mistate->slots[i] != NULL; i++)
+			ExecDropSingleTupleTableSlot(state->mistate->slots[i]);
+
+		if (state->mistate->context)
+			MemoryContextDelete(state->mistate->context);
+
+		pfree(state->mistate->slots);
+		pfree(state->mistate);
+	}
+
+	if (state->bistate)
+		FreeBulkInsertState(state->bistate);
+
+	pfree(state);
+}
+
 /*
  *	simple_heap_insert - insert a tuple
  *
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 3eea215b85..eb3da12d9c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2554,6 +2554,11 @@ static const TableAmRoutine heapam_methods = {
 	.tuple_insert_speculative = heapam_tuple_insert_speculative,
 	.tuple_complete_speculative = heapam_tuple_complete_speculative,
 	.multi_insert = heap_multi_insert,
+	.tuple_insert_begin = heap_insert_begin,
+	.tuple_insert_v2 = heap_insert_v2,
+	.multi_insert_v2 = heap_multi_insert_v2,
+	.multi_insert_flush = heap_multi_insert_flush,
+	.tuple_insert_end = heap_insert_end,
 	.tuple_delete = heapam_tuple_delete,
 	.tuple_update = heapam_tuple_update,
 	.tuple_lock = heapam_tuple_lock,
diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 58de0743ba..6bec0659e4 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -78,6 +78,13 @@ GetTableAmRoutine(Oid amhandler)
 	Assert(routine->tuple_complete_speculative != NULL);
 
 	Assert(routine->multi_insert != NULL);
+
+	Assert(routine->tuple_insert_begin != NULL);
+	Assert(routine->tuple_insert_v2 != NULL);
+	Assert(routine->multi_insert_v2 != NULL);
+	Assert(routine->multi_insert_flush != NULL);
+	Assert(routine->tuple_insert_end != NULL);
+
 	Assert(routine->tuple_delete != NULL);
 	Assert(routine->tuple_update != NULL);
 	Assert(routine->tuple_lock != NULL);
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 4c90ac5236..f93b1a49a8 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -159,7 +159,11 @@ tts_virtual_materialize(TupleTableSlot *slot)
 	if (TTS_SHOULDFREE(slot))
 		return;
 
-	/* compute size of memory required */
+	/*
+	 * Compute size of memory required. This size calculation code is also
+	 * being used in GetTupleSize(), hence ensure to have the same changes or
+	 * fixes here and also there.
+	 */
 	for (int natt = 0; natt < desc->natts; natt++)
 	{
 		Form_pg_attribute att = TupleDescAttr(desc, natt);
@@ -1239,6 +1243,83 @@ ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
 	pfree(slot);
 }
 
+/*
+ * GetTupleSize - Compute the tuple size given a table slot.
+ *
+ * For heap tuple, buffer tuple and minimal tuple slot types return the actual
+ * tuple size that exists. For virtual tuple, the size is calculated as the
+ * slot does not have the tuple size. If the computed size exceeds the given
+ * maxsize for the virtual tuple, this function exits, not investing time in
+ * further unnecessary calculation.
+ *
+ * Important Notes:
+ * 1) Size calculation code for virtual slots is being used from
+ * 	  tts_virtual_materialize(), hence ensure to have the same changes or fixes
+ * 	  here and also there.
+ * 2) Currently, GetTupleSize() handles the existing heap, buffer, minmal and
+ * 	  virtual slots. Ensure to add related code in case any new slot type is
+ *    introduced.
+ */
+inline Size
+GetTupleSize(TupleTableSlot *slot, Size maxsize)
+{
+	Size sz = 0;
+	HeapTuple tuple = NULL;
+
+	if (TTS_IS_HEAPTUPLE(slot))
+		tuple = ((HeapTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_BUFFERTUPLE(slot))
+		tuple = ((BufferHeapTupleTableSlot *) slot)->base.tuple;
+	else if(TTS_IS_MINIMALTUPLE(slot))
+		tuple = ((MinimalTupleTableSlot *) slot)->tuple;
+	else if(TTS_IS_VIRTUAL(slot))
+	{
+		/*
+		 * Size calculation code is being used from tts_virtual_materialize().
+		 * Ensure to have the same changes or fixes here and also in
+		 * tts_virtual_materialize().
+		 */
+		TupleDesc	desc = slot->tts_tupleDescriptor;
+
+		for (int natt = 0; natt < desc->natts; natt++)
+		{
+			Form_pg_attribute att = TupleDescAttr(desc, natt);
+			Datum		val;
+
+			if (att->attbyval)
+				sz += att->attlen;
+
+			if (slot->tts_isnull[natt])
+				continue;
+
+			val = slot->tts_values[natt];
+
+			if (att->attlen == -1 &&
+				VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(val)))
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz += EOH_get_flat_size(DatumGetEOHP(val));
+			}
+			else
+			{
+				sz = att_align_nominal(sz, att->attalign);
+				sz = att_addlength_datum(sz, att->attlen, val);
+			}
+
+			/*
+			 * We are not interested in proceeding further if the computed size
+			 * crosses maxsize limit that we are looking for.
+			 */
+			if (maxsize != 0 && sz >= maxsize)
+				break;
+		}
+	}
+
+	if (tuple != NULL && !TTS_IS_VIRTUAL(slot))
+		sz = tuple->t_len;
+
+	return sz;
+}
 
 /* ----------------------------------------------------------------
  *				  tuple table slot accessor functions
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 54b2eb7378..d938efbbc5 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -139,6 +139,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
+
+extern TableInsertState* heap_insert_begin(Relation rel, CommandId cid,
+										   int options, bool alloc_bistate,
+										   bool is_multi, int32 mi_max_slots,
+										   int64 mi_max_size);
+extern void heap_insert_v2(TableInsertState *state, TupleTableSlot *slot);
+extern void heap_multi_insert_v2(TableInsertState *state,
+								 TupleTableSlot *slot);
+extern void heap_multi_insert_flush(TableInsertState *state);
+extern void heap_insert_end(TableInsertState *state);
+
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..60d4cd8c8b 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -128,6 +128,80 @@ typedef struct TM_FailureData
 	bool		traversed;
 } TM_FailureData;
 
+/* Holds the multi insert related information. */
+typedef struct TableMultiInsertState
+{
+	/* Switch to short-lived memory context before flushing. */
+	MemoryContext       context;
+
+	/* Array of buffered slots. */
+	TupleTableSlot      **slots;
+
+	/* Maximum number of slots that can be buffered. */
+	int32               max_slots;
+
+	/* Number of slots that are currently buffered. */
+	int32               cur_slots;
+
+	/*
+	 * Maximum size (in bytes) of all the tuples that a single batch of
+	 * buffered slots can hold. This parameter is optional.
+	 */
+	int64               max_size;
+
+	/*
+	 * Total tuple size (in bytes) of the slots that are currently buffered. If
+	 * max_size is specified, then flush the buffered slots when cur_size >=
+	 * max_size.
+	 */
+	int64               cur_size;
+
+	/*
+	 * Current tuple size (in bytes). Set this each time before calling
+	 * table_multi_insert_v2, if the tuple size is known (as in the case of
+	 * COPY where each tuple size known after parsing the input lines).
+	 * table_multi_insert_v2 will not calculate the tuple size again to add to
+	 * cur_size, it just uses this value. table_multi_insert_v2 will set it to
+	 * -1 after it uses. Default is -1.
+	 */
+	int64               cur_tup_size;
+
+	/*
+	 * Whether to clear the buffered slots after each flush? If the relation
+	 * has indexes or after row triggers, the buffered slots are required
+	 * outside table_multi_insert_v2(), in which case, clean them in the caller
+	 * using ExecClearTuple() outside the table_multi_insert_v2. If true,
+	 * which is default, table_multi_insert_v2() will clear the slots.
+	 *
+	 * It is good to set clear_slots (by looking at whether the table is having
+	 * any indexes or after row triggers) at the beginning of multi insert
+	 * operation. It is better to set it to false for the final flush before
+	 * ending the multi insert operation. This is to save ExecClearTuple cost
+	 * while flushing as the buffered slots will anyways be dropped in the end
+	 * operation.
+	 */
+	bool                clear_slots;
+
+	/*
+	 * Initially false, set to true by table_multi_insert whenever it flushes
+	 * the buffered slots. Caller can use this flag to insert into indexes or
+	 * execute after row triggers and so on if any.
+	 */
+	bool				flushed;
+} TableMultiInsertState;
+
+/* Holds the table insert state. */
+typedef struct TableInsertState
+{
+	Relation                rel;
+	/* Bulk insert state if requested, otherwise NULL. */
+	struct BulkInsertStateData     *bistate;
+	/* Mulit insert state if requested, otherwise NULL. */
+	struct TableMultiInsertState   *mistate;
+	CommandId               cid;
+	int                     options;
+}TableInsertState;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -376,6 +450,19 @@ typedef struct TableAmRoutine
 	void		(*multi_insert) (Relation rel, TupleTableSlot **slots, int nslots,
 								 CommandId cid, int options, struct BulkInsertStateData *bistate);
 
+	TableInsertState* (*tuple_insert_begin) (Relation rel, CommandId cid,
+											 int options, bool alloc_bistate,
+											 bool is_multi, int32 mi_max_slots,
+											 int64 mi_max_size);
+
+	void (*tuple_insert_v2) (TableInsertState *state, TupleTableSlot *slot);
+
+	void (*multi_insert_v2) (TableInsertState *state, TupleTableSlot *slot);
+
+	void (*multi_insert_flush) (TableInsertState *state);
+
+	void (*tuple_insert_end) (TableInsertState *state);
+
 	/* see table_tuple_delete() for reference about parameters */
 	TM_Result	(*tuple_delete) (Relation rel,
 								 ItemPointer tid,
@@ -1237,6 +1324,39 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
 								  cid, options, bistate);
 }
 
+static inline TableInsertState*
+table_insert_begin(Relation rel, CommandId cid, int options,
+				   bool alloc_bistate, bool is_multi, int32 mi_max_slots,
+				   int64 mi_max_size)
+{
+	return rel->rd_tableam->tuple_insert_begin(rel, cid, options, alloc_bistate,
+										is_multi, mi_max_slots, mi_max_size);
+}
+
+static inline void
+table_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->tuple_insert_v2(state, slot);
+}
+
+static inline void
+table_multi_insert_v2(TableInsertState *state, TupleTableSlot *slot)
+{
+	state->rel->rd_tableam->multi_insert_v2(state, slot);
+}
+
+static inline void
+table_multi_insert_flush(TableInsertState *state)
+{
+	state->rel->rd_tableam->multi_insert_flush(state);
+}
+
+static inline void
+table_insert_end(TableInsertState *state)
+{
+	state->rel->rd_tableam->tuple_insert_end(state);
+}
+
 /*
  * Delete a tuple.
  *
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index f7df70b5ab..d7c284d8e3 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -330,6 +330,7 @@ extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
 								 int lastAttNum);
 extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
 
+extern Size GetTupleSize(TupleTableSlot *slot, Size maxsize);
 
 #ifndef FRONTEND
 
-- 
2.25.1



  [application/octet-stream] v1-0003-ATRewriteTable-With-New-Single-Insert-Table-AM.patch (2.1K, 3-v1-0003-ATRewriteTable-With-New-Single-Insert-Table-AM.patch)
  download | inline diff:
From 72351f7dbf0353ec1fcd8bb14a1563806eb62218 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Tue, 8 Dec 2020 12:20:17 +0530
Subject: [PATCH v1] ATRewriteTable With New Single Insert Table AM

This patch adds new single insert table access method to ALTER TABLE
rewrite table code.
---
 src/backend/commands/tablecmds.c | 28 ++++++++++++----------------
 1 file changed, 12 insertions(+), 16 deletions(-)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 46f1637e77..80f013036e 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5182,10 +5182,8 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	int			i;
 	ListCell   *l;
 	EState	   *estate;
-	CommandId	mycid;
-	BulkInsertState bistate;
-	int			ti_options;
 	ExprState  *partqualstate = NULL;
+	TableInsertState *istate = NULL;
 
 	/*
 	 * Open the relation(s).  We have surely already locked the existing
@@ -5206,16 +5204,13 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	 */
 	if (newrel)
 	{
-		mycid = GetCurrentCommandId(true);
-		bistate = GetBulkInsertState();
-		ti_options = TABLE_INSERT_SKIP_FSM;
-	}
-	else
-	{
-		/* keep compiler quiet about using these uninitialized */
-		mycid = 0;
-		bistate = NULL;
-		ti_options = 0;
+		istate = table_insert_begin(newrel,
+									GetCurrentCommandId(true),
+									TABLE_INSERT_SKIP_FSM,
+									true,
+									false,
+									-1,
+									-1);
 	}
 
 	/*
@@ -5510,8 +5505,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 
 			/* Write the tuple out to the new relation */
 			if (newrel)
-				table_tuple_insert(newrel, insertslot, mycid,
-								   ti_options, bistate);
+				table_insert_v2(istate, insertslot);
 
 			ResetExprContext(econtext);
 
@@ -5532,7 +5526,9 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	table_close(oldrel, NoLock);
 	if (newrel)
 	{
-		FreeBulkInsertState(bistate);
+		int ti_options = istate->options;
+
+		table_insert_end(istate);
 
 		table_finish_bulk_insert(newrel, ti_options);
 
-- 
2.25.1



  [application/octet-stream] v1-0004-COPY-With-New-Multi-and-Single-Insert-Table-AM.patch (22.4K, 4-v1-0004-COPY-With-New-Multi-and-Single-Insert-Table-AM.patch)
  download | inline diff:
From 59bd7de19762241fa53eed6f64510f022345b14b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Tue, 8 Dec 2020 13:01:32 +0530
Subject: [PATCH v1] COPY With New Multi and Single Insert Table AM

This patch adds new single and multi insert table access method to
COPY code.
---
 src/backend/commands/copyfrom.c | 483 +++++++++++---------------------
 1 file changed, 163 insertions(+), 320 deletions(-)

diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb..8376af32f5 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -45,10 +45,10 @@
 #include "utils/snapmgr.h"
 
 /*
- * No more than this many tuples per CopyMultiInsertBuffer
+ * No more than this many tuples per multi insert buffer
  *
  * Caution: Don't make this too big, as we could end up with this many
- * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
+ * multi insert buffer items stored in CopyMultiInsertInfo's
  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
  * memory requirements during copies into partitioned tables with a large
  * number of partitions.
@@ -67,31 +67,11 @@
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
-	TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
-	ResultRelInfo *resultRelInfo;	/* ResultRelInfo for 'relid' */
-	BulkInsertState bistate;	/* BulkInsertState for this rel */
-	int			nused;			/* number of 'slots' containing tuples */
-	uint64		linenos[MAX_BUFFERED_TUPLES];	/* Line # of tuple in copy
-												 * stream */
+	TableInsertState *istate;
+	/* Line # of tuple in copy stream. */
+	uint64		linenos[MAX_BUFFERED_TUPLES];
 } CopyMultiInsertBuffer;
 
-/*
- * Stores one or many CopyMultiInsertBuffers and details about the size and
- * number of tuples which are stored in them.  This allows multiple buffers to
- * exist at once when COPYing into a partitioned table.
- */
-typedef struct CopyMultiInsertInfo
-{
-	List	   *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
-	int			bufferedTuples; /* number of tuples buffered over all buffers */
-	int			bufferedBytes;	/* number of bytes from all buffered tuples */
-	CopyFromState	cstate;			/* Copy state for this CopyMultiInsertInfo */
-	EState	   *estate;			/* Executor state used for COPY */
-	CommandId	mycid;			/* Command Id used for COPY */
-	int			ti_options;		/* table insert options */
-} CopyMultiInsertInfo;
-
-
 /* non-export function prototypes */
 static char *limit_printout_length(const char *str);
 
@@ -204,227 +184,130 @@ limit_printout_length(const char *str)
 	return res;
 }
 
-/*
- * Allocate memory and initialize a new CopyMultiInsertBuffer for this
- * ResultRelInfo.
- */
-static CopyMultiInsertBuffer *
-CopyMultiInsertBufferInit(ResultRelInfo *rri)
+static void
+InitCopyMultiInsertBufferInfo(List **mirri, ResultRelInfo *rri,
+							  CommandId mycid, int ti_options)
 {
 	CopyMultiInsertBuffer *buffer;
+	TriggerDesc *trigdesc = rri->ri_TrigDesc;
 
-	buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
-	memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
-	buffer->resultRelInfo = rri;
-	buffer->bistate = GetBulkInsertState();
-	buffer->nused = 0;
-
-	return buffer;
-}
+	buffer = (CopyMultiInsertBuffer *) palloc0(sizeof(CopyMultiInsertBuffer));
 
-/*
- * Make a new buffer for this ResultRelInfo.
- */
-static inline void
-CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
-							   ResultRelInfo *rri)
-{
-	CopyMultiInsertBuffer *buffer;
+	buffer->istate = table_insert_begin(rri->ri_RelationDesc,
+										mycid,
+										ti_options,
+										true,
+										true,
+										MAX_BUFFERED_TUPLES,
+										MAX_BUFFERED_BYTES);
 
-	buffer = CopyMultiInsertBufferInit(rri);
+	if (rri->ri_NumIndices ||
+		(trigdesc && (trigdesc->trig_insert_after_row ||
+		trigdesc->trig_insert_new_table)))
+		buffer->istate->mistate->clear_slots = false;
 
-	/* Setup back-link so we can easily find this buffer again */
 	rri->ri_CopyMultiInsertBuffer = buffer;
-	/* Record that we're tracking this buffer */
-	miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-}
-
-/*
- * Initialize an already allocated CopyMultiInsertInfo.
- *
- * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
- * for that table.
- */
-static void
-CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						CopyFromState cstate, EState *estate, CommandId mycid,
-						int ti_options)
-{
-	miinfo->multiInsertBuffers = NIL;
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
-	miinfo->cstate = cstate;
-	miinfo->estate = estate;
-	miinfo->mycid = mycid;
-	miinfo->ti_options = ti_options;
 
-	/*
-	 * Only setup the buffer when not dealing with a partitioned table.
-	 * Buffers for partitioned tables will just be setup when we need to send
-	 * tuples their way for the first time.
-	 */
-	if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
-		CopyMultiInsertInfoSetupBuffer(miinfo, rri);
+	*mirri = lappend(*mirri, rri);
 }
 
-/*
- * Returns true if the buffers are full
- */
-static inline bool
-CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
-{
-	if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
-		miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
-		return true;
-	return false;
-}
-
-/*
- * Returns true if we have no buffered tuples
- */
-static inline bool
-CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
-{
-	return miinfo->bufferedTuples == 0;
-}
-
-/*
- * Write the tuples stored in 'buffer' out to the table.
- */
-static inline void
-CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
-						   CopyMultiInsertBuffer *buffer)
+static void
+HandleAfterRowEvents(ResultRelInfo *rri, EState *estate,
+					 CopyFromState cstate, int32 cur_slots)
 {
-	MemoryContext oldcontext;
-	int			i;
-	uint64		save_cur_lineno;
-	CopyFromState	cstate = miinfo->cstate;
-	EState	   *estate = miinfo->estate;
-	CommandId	mycid = miinfo->mycid;
-	int			ti_options = miinfo->ti_options;
+	int i;
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+	TableInsertState *istate = buffer->istate;
+	uint64		save_cur_lineno = cstate->cur_lineno;
 	bool		line_buf_valid = cstate->line_buf_valid;
-	int			nused = buffer->nused;
-	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
-	TupleTableSlot **slots = buffer->slots;
 
-	/*
-	 * Print error context information correctly, if one of the operations
-	 * below fail.
-	 */
 	cstate->line_buf_valid = false;
-	save_cur_lineno = cstate->cur_lineno;
-
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
+	for (i = 0; i < cur_slots; i++)
 	{
 		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
-		 */
-		if (resultRelInfo->ri_NumIndices > 0)
+		* If there are any indexes, update them for all the inserted tuples,
+		* and run AFTER ROW INSERT triggers.
+		*/
+		if (rri->ri_NumIndices > 0)
 		{
-			List	   *recheckIndexes;
+			List       *recheckIndexes;
 
 			cstate->cur_lineno = buffer->linenos[i];
 			recheckIndexes =
-				ExecInsertIndexTuples(resultRelInfo,
-									  buffer->slots[i], estate, false, NULL,
-									  NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
+					ExecInsertIndexTuples(rri,
+										  istate->mistate->slots[i], estate,
+										  false,
+										  NULL,
+										  NULL);
+
+			ExecARInsertTriggers(estate,
+								 rri,
+								 istate->mistate->slots[i],
+								 recheckIndexes,
 								 cstate->transition_capture);
+
 			list_free(recheckIndexes);
 		}
 
 		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
-		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		* There's no indexes, but see if we need to run AFTER ROW INSERT
+		* triggers anyway.
+		*/
+		else if (rri->ri_TrigDesc != NULL &&
+				(rri->ri_TrigDesc->trig_insert_after_row ||
+				 rri->ri_TrigDesc->trig_insert_new_table))
 		{
 			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
+			ExecARInsertTriggers(estate,
+								 rri,
+								 istate->mistate->slots[i],
+								 NULL,
+								 cstate->transition_capture);
 		}
 
-		ExecClearTuple(slots[i]);
+		ExecClearTuple(istate->mistate->slots[i]);
 	}
 
-	/* Mark that all slots are free */
-	buffer->nused = 0;
-
 	/* reset cur_lineno and line_buf_valid to what they were */
 	cstate->line_buf_valid = line_buf_valid;
 	cstate->cur_lineno = save_cur_lineno;
 }
 
-/*
- * Drop used slots and free member for this buffer.
- *
- * The buffer must be flushed before cleanup.
- */
-static inline void
-CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
-							 CopyMultiInsertBuffer *buffer)
+static void
+CopyMultiInsertBufferTuple(ResultRelInfo *rri, TupleTableSlot *slot,
+					   CopyFromState cstate, EState *estate)
 {
-	int			i;
-
-	/* Ensure buffer was flushed */
-	Assert(buffer->nused == 0);
-
-	/* Remove back-link to ourself */
-	buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
-
-	FreeBulkInsertState(buffer->bistate);
+	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+	TableInsertState *istate = buffer->istate;
+	int32 cur_slots = istate->mistate->cur_slots;
 
-	/* Since we only create slots on demand, just drop the non-null ones. */
-	for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
-		ExecDropSingleTupleTableSlot(buffer->slots[i]);
+	buffer->linenos[istate->mistate->cur_slots] = cstate->cur_lineno;
+	istate->mistate->cur_tup_size = cstate->line_buf.len;
 
-	table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc,
-							 miinfo->ti_options);
+	table_multi_insert_v2(buffer->istate, slot);
 
-	pfree(buffer);
+	if (istate->mistate->flushed)
+		HandleAfterRowEvents(rri, estate, cstate, cur_slots);
 }
 
-/*
- * Write out all stored tuples in all buffers out to the tables.
- *
- * Once flushed we also trim the tracked buffers list down to size by removing
- * the buffers created earliest first.
- *
- * Callers should pass 'curr_rri' is the ResultRelInfo that's currently being
- * used.  When cleaning up old buffers we'll never remove the one for
- * 'curr_rri'.
- */
-static inline void
-CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
+static void
+CopyMulitInsertFlushBuffers(List **mirri, ResultRelInfo *curr_rri,
+							CopyFromState cstate, EState *estate)
 {
 	ListCell   *lc;
 
-	foreach(lc, miinfo->multiInsertBuffers)
+	foreach(lc, *mirri)
 	{
-		CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
+		ResultRelInfo *rri = lfirst(lc);
+		CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+		TableInsertState *istate = buffer->istate;
+		int32 cur_slots = istate->mistate->cur_slots;
 
-		CopyMultiInsertBufferFlush(miinfo, buffer);
-	}
+		table_multi_insert_flush(istate);
 
-	miinfo->bufferedTuples = 0;
-	miinfo->bufferedBytes = 0;
+		if (istate->mistate->flushed)
+			HandleAfterRowEvents(rri, estate, cstate, cur_slots);
+	}
 
 	/*
 	 * Trim the list of tracked buffers down if it exceeds the limit.  Here we
@@ -432,87 +315,62 @@ CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri)
 	 * likely that these older ones will be needed than the ones that were
 	 * just created.
 	 */
-	while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
+	while (list_length(*mirri) > MAX_PARTITION_BUFFERS)
 	{
+		ResultRelInfo *rri;
 		CopyMultiInsertBuffer *buffer;
+		TableInsertState *istate;
+		int ti_options;
 
-		buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+		rri = (ResultRelInfo *) linitial(*mirri);
 
 		/*
 		 * We never want to remove the buffer that's currently being used, so
 		 * if we happen to find that then move it to the end of the list.
 		 */
-		if (buffer->resultRelInfo == curr_rri)
+		if (rri == curr_rri)
 		{
-			miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-			miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
-			buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
+			*mirri = list_delete_first(*mirri);
+			*mirri = lappend(*mirri, rri);
+			rri = (ResultRelInfo *) linitial(*mirri);
 		}
 
-		CopyMultiInsertBufferCleanup(miinfo, buffer);
-		miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
-	}
-}
+		buffer = rri->ri_CopyMultiInsertBuffer;
+		istate = buffer->istate;
+		istate->mistate->clear_slots = true;
+		ti_options = istate->options;
 
-/*
- * Cleanup allocated buffers and free memory
- */
-static inline void
-CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
-{
-	ListCell   *lc;
+		table_insert_end(istate);
 
-	foreach(lc, miinfo->multiInsertBuffers)
-		CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
+		table_finish_bulk_insert(rri->ri_RelationDesc, ti_options);
 
-	list_free(miinfo->multiInsertBuffers);
+		*mirri = list_delete_first(*mirri);
+	}
 }
 
-/*
- * Get the next TupleTableSlot that the next tuple should be stored in.
- *
- * Callers must ensure that the buffer is not full.
- *
- * Note: 'miinfo' is unused but has been included for consistency with the
- * other functions in this area.
- */
-static inline TupleTableSlot *
-CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
-								ResultRelInfo *rri)
+static void
+CopyMulitInsertDropBuffers(List *mirri)
 {
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
-	int			nused = buffer->nused;
-
-	Assert(buffer != NULL);
-	Assert(nused < MAX_BUFFERED_TUPLES);
+	ListCell   *lc;
 
-	if (buffer->slots[nused] == NULL)
-		buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
-	return buffer->slots[nused];
-}
+	foreach(lc, mirri)
+	{
+		int ti_options;
+		ResultRelInfo *rri = lfirst(lc);
+		CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+		TableInsertState *istate = buffer->istate;
 
-/*
- * Record the previously reserved TupleTableSlot that was reserved by
- * CopyMultiInsertInfoNextFreeSlot as being consumed.
- */
-static inline void
-CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
-						 TupleTableSlot *slot, int tuplen, uint64 lineno)
-{
-	CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
+		istate->mistate->clear_slots = true;
+		ti_options = istate->options;
 
-	Assert(buffer != NULL);
-	Assert(slot == buffer->slots[buffer->nused]);
+		table_insert_end(istate);
 
-	/* Store the line number so we can properly report any errors later */
-	buffer->linenos[buffer->nused] = lineno;
+		table_finish_bulk_insert(rri->ri_RelationDesc, ti_options);
 
-	/* Record this slot as being used */
-	buffer->nused++;
+		pfree(buffer);
+	}
 
-	/* Update how many tuples are stored and their size */
-	miinfo->bufferedTuples++;
-	miinfo->bufferedBytes += tuplen;
+	list_free(mirri);
 }
 
 /*
@@ -527,20 +385,20 @@ CopyFrom(CopyFromState cstate)
 	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
 	ModifyTableState *mtstate;
 	ExprContext *econtext;
-	TupleTableSlot *singleslot = NULL;
+	TupleTableSlot *slot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
 	CommandId	mycid = GetCurrentCommandId(true);
 	int			ti_options = 0; /* start with default options for insert */
-	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
-	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
 	uint64		processed = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
+	List	   *multi_insert_rris = NULL;
+	TableInsertState *istate = NULL;
 
 	Assert(cstate->rel);
 	Assert(list_length(cstate->range_table) == 1);
@@ -723,7 +581,7 @@ CopyFrom(CopyFromState cstate)
 		 * For partitioned tables we can't support multi-inserts when there
 		 * are any statement level insert triggers. It might be possible to
 		 * allow partitioned tables with such triggers in the future, but for
-		 * now, CopyMultiInsertInfoFlush expects that any before row insert
+		 * now, CopyMulitInsertFlushBuffers expects that any before row insert
 		 * and statement level insert triggers are on the same relation.
 		 */
 		insertMethod = CIM_SINGLE;
@@ -771,22 +629,22 @@ CopyFrom(CopyFromState cstate)
 		else
 			insertMethod = CIM_MULTI;
 
-		CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
-								estate, mycid, ti_options);
+		/*
+		* Only setup the buffer when not dealing with a partitioned table.
+		* Buffers for partitioned tables will just be setup when we need to
+		* send tuples their way for the first time.
+		*/
+		if (!proute)
+			InitCopyMultiInsertBufferInfo(&multi_insert_rris, resultRelInfo,
+										  mycid, ti_options);
 	}
 
 	/*
-	 * If not using batch mode (which allocates slots as needed) set up a
-	 * tuple slot too. When inserting into a partitioned table, we also need
-	 * one, even if we might batch insert, to read the tuple in the root
-	 * partition's form.
+	 * Set up a tuple slot to which the input data from copy stream is read
+	 * into and used for inserts into table.
 	 */
-	if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
-	{
-		singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
-									   &estate->es_tupleTable);
-		bistate = GetBulkInsertState();
-	}
+	slot = table_slot_create(resultRelInfo->ri_RelationDesc,
+									&estate->es_tupleTable);
 
 	has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
 								  resultRelInfo->ri_TrigDesc->trig_insert_before_row);
@@ -824,19 +682,8 @@ CopyFrom(CopyFromState cstate)
 		ResetPerTupleExprContext(estate);
 
 		/* select slot to (initially) load row into */
-		if (insertMethod == CIM_SINGLE || proute)
-		{
-			myslot = singleslot;
-			Assert(myslot != NULL);
-		}
-		else
-		{
-			Assert(resultRelInfo == target_resultRelInfo);
-			Assert(insertMethod == CIM_MULTI);
-
-			myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
-													 resultRelInfo);
-		}
+		myslot = slot;
+		Assert(myslot != NULL);
 
 		/*
 		 * Switch to per-tuple context before calling NextCopyFrom, which does
@@ -904,21 +751,22 @@ CopyFrom(CopyFromState cstate)
 				if (leafpart_use_multi_insert)
 				{
 					if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
-						CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
-													   resultRelInfo);
+						InitCopyMultiInsertBufferInfo(&multi_insert_rris,
+													  resultRelInfo, mycid,
+													  ti_options);
 				}
-				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+				else if (insertMethod == CIM_MULTI_CONDITIONAL)
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
 					 * batching, so rows are visible to triggers etc.
 					 */
-					CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					CopyMulitInsertFlushBuffers(&multi_insert_rris,
+												resultRelInfo, cstate, estate);
 				}
 
-				if (bistate != NULL)
-					ReleaseBulkInsertStatePin(bistate);
+				if (istate && istate->bistate)
+					ReleaseBulkInsertStatePin(istate->bistate);
 				prevResultRelInfo = resultRelInfo;
 			}
 
@@ -960,8 +808,8 @@ CopyFrom(CopyFromState cstate)
 				/* no other path available for partitioned table */
 				Assert(insertMethod == CIM_MULTI_CONDITIONAL);
 
-				batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
-															resultRelInfo);
+				batchslot = table_slot_create(resultRelInfo->ri_RelationDesc,
+											  &estate->es_tupleTable);
 
 				if (map != NULL)
 					myslot = execute_attr_map_slot(map->attrMap, myslot,
@@ -1033,24 +881,9 @@ CopyFrom(CopyFromState cstate)
 				/* Store the slot in the multi-insert buffer, when enabled. */
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
-					/*
-					 * The slot previously might point into the per-tuple
-					 * context. For batching it needs to be longer lived.
-					 */
-					ExecMaterializeSlot(myslot);
-
 					/* Add this tuple to the tuple buffer */
-					CopyMultiInsertInfoStore(&multiInsertInfo,
-											 resultRelInfo, myslot,
-											 cstate->line_buf.len,
-											 cstate->cur_lineno);
-
-					/*
-					 * If enough inserts have queued up, then flush all
-					 * buffers out to their tables.
-					 */
-					if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
-						CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
+					CopyMultiInsertBufferTuple(resultRelInfo, myslot, cstate,
+											   estate);
 				}
 				else
 				{
@@ -1076,9 +909,21 @@ CopyFrom(CopyFromState cstate)
 					}
 					else
 					{
+						if (!istate)
+						{
+							istate = table_insert_begin(resultRelInfo->ri_RelationDesc,
+														mycid,
+														ti_options,
+														true,
+														false,
+														-1,
+														-1);
+						}
+
+						istate->rel = resultRelInfo->ri_RelationDesc;
+
 						/* OK, store the tuple and create index entries for it */
-						table_tuple_insert(resultRelInfo->ri_RelationDesc,
-										   myslot, mycid, ti_options, bistate);
+						table_insert_v2(istate, myslot);
 
 						if (resultRelInfo->ri_NumIndices > 0)
 							recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
@@ -1108,16 +953,14 @@ CopyFrom(CopyFromState cstate)
 
 	/* Flush any remaining buffered tuples */
 	if (insertMethod != CIM_SINGLE)
-	{
-		if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
-			CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
-	}
+		CopyMulitInsertFlushBuffers(&multi_insert_rris, resultRelInfo,
+									cstate, estate);
 
 	/* Done, clean up */
 	error_context_stack = errcallback.previous;
 
-	if (bistate != NULL)
-		FreeBulkInsertState(bistate);
+	if (istate)
+		table_insert_end(istate);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -1144,7 +987,7 @@ CopyFrom(CopyFromState cstate)
 
 	/* Tear down the multi-insert buffer data */
 	if (insertMethod != CIM_SINGLE)
-		CopyMultiInsertInfoCleanup(&multiInsertInfo);
+		CopyMulitInsertDropBuffers(multi_insert_rris);
 
 	/* Close all the partitioned tables, leaf partitions, and their indices */
 	if (proute)
-- 
2.25.1



  [application/octet-stream] v1-0002-CTAS-and-REFRESH-Mat-View-With-New-Multi-Insert-Table-AM.patch (6.5K, 5-v1-0002-CTAS-and-REFRESH-Mat-View-With-New-Multi-Insert-Table-AM.patch)
  download | inline diff:
From e777510f323c09839a6dd9253a327f5dd4172a8b Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Tue, 8 Dec 2020 12:14:22 +0530
Subject: [PATCH v1] CTAS and REFRESH Mat View With New Multi Insert Table AM

This patch adds new multi insert table access methods to
CREATE TABLE AS, CREATE MATERIALIZED VIEW and REFRESH MATERIALIZED
VIEW.
---
 src/backend/commands/createas.c | 57 ++++++++++++++++++++-------------
 src/backend/commands/matview.c  | 43 ++++++++++++++-----------
 2 files changed, 59 insertions(+), 41 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 6bf6c5a310..4580cbae1d 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -58,9 +58,7 @@ typedef struct
 	/* These fields are filled by intorel_startup: */
 	Relation	rel;			/* relation to write to */
 	ObjectAddress reladdr;		/* address of rel, for ExecCreateTableAs */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableInsertState *istate;	/* insert state */
 } DR_intorel;
 
 /* utility functions for CTAS definition creation */
@@ -523,22 +521,28 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	if (is_matview && !into->skipData)
 		SetMatViewPopulatedState(intoRelationDesc, true);
 
-	/*
-	 * Fill private fields of myState for use by later routines
-	 */
-	myState->rel = intoRelationDesc;
-	myState->reladdr = intoRelationAddr;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM;
-
 	/*
 	 * If WITH NO DATA is specified, there is no need to set up the state for
-	 * bulk inserts as there are no tuples to insert.
+	 * bulk inserts and multi inserts as there are no tuples to insert.
 	 */
 	if (!into->skipData)
-		myState->bistate = GetBulkInsertState();
+	{
+		myState->istate = table_insert_begin(intoRelationDesc,
+											 GetCurrentCommandId(true),
+											 TABLE_INSERT_SKIP_FSM,
+											 true,
+											 true,
+											 1000, /* To change it to a macro */
+											 65535); /* To change it to a macro */
+	}
 	else
-		myState->bistate = NULL;
+		myState->istate = NULL;
+
+	/*
+	 * Fill private fields of myState for use by later routines
+	 */
+	myState->rel = intoRelationDesc;
+	myState->reladdr = intoRelationAddr;
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -566,11 +570,7 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self)
 		 * would not be cheap either. This also doesn't allow accessing per-AM
 		 * data (say a tuple's xmin), but since we don't do that here...
 		 */
-		table_tuple_insert(myState->rel,
-						   slot,
-						   myState->output_cid,
-						   myState->ti_options,
-						   myState->bistate);
+		table_multi_insert_v2(myState->istate, slot);
 	}
 
 	/* We know this is a newly created relation, so there are no indexes */
@@ -585,12 +585,23 @@ static void
 intorel_shutdown(DestReceiver *self)
 {
 	DR_intorel *myState = (DR_intorel *) self;
-	IntoClause *into = myState->into;
+	int			ti_options;
 
-	if (!into->skipData)
+	if (!myState->into->skipData)
 	{
-		FreeBulkInsertState(myState->bistate);
-		table_finish_bulk_insert(myState->rel, myState->ti_options);
+		ti_options = myState->istate->options;
+
+		/*
+		 * Do not let clearing buffered slots while flushing as they will be
+		 * anyways dropped by table_insert_end.
+		 */
+		myState->istate->mistate->clear_slots = false;
+
+		table_multi_insert_flush(myState->istate);
+
+		table_insert_end(myState->istate);
+
+		table_finish_bulk_insert(myState->rel, ti_options);
 	}
 
 	/* close rel, but keep lock until commit */
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index cfc63915f3..610c7ede78 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -52,10 +52,7 @@ typedef struct
 	DestReceiver pub;			/* publicly-known function pointers */
 	Oid			transientoid;	/* OID of new heap into which to store */
 	/* These fields are filled by transientrel_startup: */
-	Relation	transientrel;	/* relation to write to */
-	CommandId	output_cid;		/* cmin to insert in output tuples */
-	int			ti_options;		/* table_tuple_insert performance options */
-	BulkInsertState bistate;	/* bulk insert state */
+	TableInsertState *istate;	/* insert state */
 } DR_transientrel;
 
 static int	matview_maintenance_depth = 0;
@@ -466,10 +463,13 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
 	/*
 	 * Fill private fields of myState for use by later routines
 	 */
-	myState->transientrel = transientrel;
-	myState->output_cid = GetCurrentCommandId(true);
-	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
-	myState->bistate = GetBulkInsertState();
+	myState->istate = table_insert_begin(transientrel,
+										 GetCurrentCommandId(true),
+										 TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN,
+										 true,
+										 true,
+										 1000, /* To change it to a macro */
+										 65535); /* To change it to a macro */
 
 	/*
 	 * Valid smgr_targblock implies something already wrote to the relation.
@@ -494,12 +494,7 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
 	 * cheap either. This also doesn't allow accessing per-AM data (say a
 	 * tuple's xmin), but since we don't do that here...
 	 */
-
-	table_tuple_insert(myState->transientrel,
-					   slot,
-					   myState->output_cid,
-					   myState->ti_options,
-					   myState->bistate);
+	table_multi_insert_v2(myState->istate, slot);
 
 	/* We know this is a newly created relation, so there are no indexes */
 
@@ -513,14 +508,26 @@ static void
 transientrel_shutdown(DestReceiver *self)
 {
 	DR_transientrel *myState = (DR_transientrel *) self;
+	int			ti_options;
+	Relation	transientrel;
+
+	ti_options = myState->istate->options;
+	transientrel = myState->istate->rel;
+
+	/*
+	 * Do not let clearing buffered slots while flushing as they will be
+	 * anyways dropped by table_insert_end.
+	 */
+	myState->istate->mistate->clear_slots = false;
+
+	table_multi_insert_flush(myState->istate);
 
-	FreeBulkInsertState(myState->bistate);
+	table_insert_end(myState->istate);
 
-	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+	table_finish_bulk_insert(transientrel, ti_options);
 
 	/* close transientrel, but keep lock until commit */
-	table_close(myState->transientrel, NoLock);
-	myState->transientrel = NULL;
+	table_close(transientrel, NoLock);
 }
 
 /*
-- 
2.25.1



view thread (36+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected]
  Subject: Re: New Table Access Methods for Multi and Single Inserts
  In-Reply-To: <CALj2ACVi9eTRYR=gdca5wxtj3Kk_9q9qVccxsS1hngTGOCjPwQ@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox