public inbox for [email protected]  
help / color / mirror / Atom feed
From: Mircea Cadariu <[email protected]>
To: lakshmi <[email protected]>
To: Hayato Kuroda (Fujitsu) <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Cc: [email protected] <[email protected]>
Cc: Heikki Linnakangas <[email protected]>
Subject: Re: parallel data loading for pgbench -i
Date: Fri, 8 May 2026 19:11:46 +0100
Message-ID: <[email protected]> (raw)
In-Reply-To: <OS9PR01MB12149E0E2CAAB378D264A35B1F5242@OS9PR01MB12149.jpnprd01.prod.outlook.com>
References: <[email protected]>
	<CAEvyyTj0rEsgcQOQgkARbRPbupHR_mc=TUzHBBLNzd8JByUUTw@mail.gmail.com>
	<[email protected]>
	<OS9PR01MB12149A6CEE200E1A5A88D9F14F563A@OS9PR01MB12149.jpnprd01.prod.outlook.com>
	<CAEvyyTiQqd=rv3XUxc0YEaW-feopksBveZKKjVZNeSVG=GrY+A@mail.gmail.com>
	<TYRPR01MB121560B291DA3CD262CC7A09AF568A@TYRPR01MB12156.jpnprd01.prod.outlook.com>
	<CAEvyyTjPWfvJLn3c_G_zLRffZ3=YqzMYj6c5znaNxpHyZAg3XQ@mail.gmail.com>
	<[email protected]>
	<CAEvyyTircZ-tHgap=J6Aog0CBgXp4Dqx6dHYyK1iqgfoT+8D_A@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<OS9PR01MB12149E0E2CAAB378D264A35B1F5242@OS9PR01MB12149.jpnprd01.prod.outlook.com>

Hi Lakshmi and Hayato,


Thanks a lot for your feedback.

Attached for your consideration is v4, in which I address your remarks.

-- 
Thanks,
Mircea Cadariu

From 6099f30cf78b0ed8608670ff07b8a71b8cf0d47c Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 3 May 2026 16:42:20 +0100
Subject: [PATCH v4] pgbench: parallelize account loading for range-partitioned
 tables

In init mode with range partitioning, -j > 1 loads pgbench_accounts
in parallel.  Each worker creates its assigned partitions as
standalone tables, populates them with COPY FREEZE, and the main
connection attaches them afterwards.
---
 doc/src/sgml/ref/pgbench.sgml                |   9 +
 src/bin/pgbench/pgbench.c                    | 258 +++++++++++++++++--
 src/bin/pgbench/t/001_pgbench_with_server.pl |  29 +++
 3 files changed, 269 insertions(+), 27 deletions(-)

diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 2e401d1ceb..3594b731cc 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -382,6 +382,11 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
         the scaled number of accounts.
         Default is <literal>0</literal>, meaning no partitioning.
        </para>
+       <para>
+        With <option>-j</option> greater than 1 and
+        <option>--partition-method=range</option>, partitions are
+        loaded in parallel.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -502,6 +507,10 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
         Clients are distributed as evenly as possible among available threads.
         Default is 1.
        </para>
+       <para>
+        In initialization mode (<option>-i</option>), <option>-j</option>
+        sets the number of threads used to load partitions in parallel.
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..aa21b653ce 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4817,6 +4817,34 @@ initDropTables(PGconn *con)
 					 "pgbench_tellers");
 }
 
+static void
+appendAccountsRangeForValues(PQExpBufferData *query, int p)
+{
+	int64		part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
+
+	appendPQExpBufferStr(query, " for values from (");
+	if (p == 1)
+		appendPQExpBufferStr(query, "minvalue");
+	else
+		appendPQExpBuffer(query, INT64_FORMAT, (p - 1) * part_size + 1);
+	appendPQExpBufferStr(query, ") to (");
+	if (p < partitions)
+		appendPQExpBuffer(query, INT64_FORMAT, p * part_size + 1);
+	else
+		appendPQExpBufferStr(query, "maxvalue");
+	appendPQExpBufferChar(query, ')');
+}
+
+static void
+getAccountsPartitionRows(int p, int64 *start_row, int64 *end_row)
+{
+	int64		total_rows = (int64) naccounts * scale;
+	int64		part_size = (total_rows + partitions - 1) / partitions;
+
+	*start_row = (int64) (p - 1) * part_size;
+	*end_row = (p == partitions) ? total_rows : (int64) p * part_size;
+}
+
 /*
  * Create "pgbench_accounts" partitions if needed.
  *
@@ -4839,33 +4867,17 @@ createPartitions(PGconn *con)
 	{
 		if (partition_method == PART_RANGE)
 		{
-			int64		part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
-
-			printfPQExpBuffer(&query,
-							  "create%s table pgbench_accounts_%d\n"
-							  "  partition of pgbench_accounts\n"
-							  "  for values from (",
-							  unlogged_tables ? " unlogged" : "", p);
-
 			/*
 			 * For RANGE, we use open-ended partitions at the beginning and
 			 * end to allow any valid value for the primary key.  Although the
 			 * actual minimum and maximum values can be derived from the
 			 * scale, it is more generic and the performance is better.
 			 */
-			if (p == 1)
-				appendPQExpBufferStr(&query, "minvalue");
-			else
-				appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) * part_size + 1);
-
-			appendPQExpBufferStr(&query, ") to (");
-
-			if (p < partitions)
-				appendPQExpBuffer(&query, INT64_FORMAT, p * part_size + 1);
-			else
-				appendPQExpBufferStr(&query, "maxvalue");
-
-			appendPQExpBufferChar(&query, ')');
+			printfPQExpBuffer(&query,
+							  "create%s table pgbench_accounts_%d\n"
+							  "  partition of pgbench_accounts",
+							  unlogged_tables ? " unlogged" : "", p);
+			appendAccountsRangeForValues(&query, p);
 		}
 		else if (partition_method == PART_HASH)
 			printfPQExpBuffer(&query,
@@ -4889,6 +4901,62 @@ createPartitions(PGconn *con)
 	termPQExpBuffer(&query);
 }
 
+static void
+createStandalonePartitions(PGconn *con, int part_start, int part_end)
+{
+	PQExpBufferData query;
+	const char *aid_type = (scale >= SCALE_32BIT_THRESHOLD) ? "bigint" : "int";
+
+	Assert(partitions > 0);
+	Assert(partition_method == PART_RANGE);
+
+	initPQExpBuffer(&query);
+
+	for (int p = part_start; p <= part_end; p++)
+	{
+		printfPQExpBuffer(&query,
+						  "create%s table pgbench_accounts_%d\n"
+						  "  (aid %s not null,\n"
+						  "   bid int,\n"
+						  "   abalance int,\n"
+						  "   filler character(84))\n"
+						  "  with (fillfactor=%d)",
+						  unlogged_tables ? " unlogged" : "", p,
+						  aid_type, fillfactor);
+
+		executeStatement(con, query.data);
+	}
+
+	termPQExpBuffer(&query);
+}
+
+static void
+attachStandalonePartitions(PGconn *con)
+{
+	PQExpBufferData query;
+
+	Assert(partitions > 0);
+	Assert(partition_method == PART_RANGE);
+
+	initPQExpBuffer(&query);
+
+	executeStatement(con, "begin");
+
+	for (int p = 1; p <= partitions; p++)
+	{
+		printfPQExpBuffer(&query,
+						  "alter table pgbench_accounts\n"
+						  "  attach partition pgbench_accounts_%d",
+						  p);
+		appendAccountsRangeForValues(&query, p);
+		executeStatement(con, query.data);
+	}
+
+	executeStatement(con, "commit");
+
+	termPQExpBuffer(&query);
+}
+
 /*
  * Create pgbench's standard tables
  */
@@ -4981,7 +5049,17 @@ initCreateTables(PGconn *con)
 	termPQExpBuffer(&query);
 
 	if (partition_method != PART_NONE)
+	{
+		/*
+		 * In the parallel range-partitioned case, partitions are created by
+		 * the worker threads (so each one can use COPY FREEZE in its own
+		 * transaction) and attached afterwards.
+		 */
+		if (partition_method == PART_RANGE && nthreads > 1)
+			return;
+
 		createPartitions(con);
+	}
 }
 
 /*
@@ -5143,6 +5221,121 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
 	termPQExpBuffer(&sql);
 }
 
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+	int64		start_row;
+	int64		end_row;
+	char		copy_stmt[256];
+	PGresult   *res;
+	PQExpBufferData sql;
+	int64		row;
+
+	getAccountsPartitionRows(partno, &start_row, &end_row);
+
+	snprintf(copy_stmt, sizeof(copy_stmt),
+			 PQserverVersion(con) >= 140000 ?
+			 "copy pgbench_accounts_%d from stdin with (freeze on)" :
+			 "copy pgbench_accounts_%d from stdin",
+			 partno);
+
+	res = PQexec(con, copy_stmt);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pg_fatal("could not start COPY for partition %d: %s",
+				 partno, PQerrorMessage(con));
+	PQclear(res);
+
+	initPQExpBuffer(&sql);
+
+	for (row = start_row; row < end_row; row++)
+	{
+		initAccount(&sql, row);
+		if (PQputCopyData(con, sql.data, sql.len) <= 0)
+			pg_fatal("PQputCopyData failed for partition %d", partno);
+	}
+
+	if (PQputCopyEnd(con, NULL) <= 0)
+		pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+	res = PQgetResult(con);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("COPY failed for partition %d: %s", partno, PQerrorMessage(con));
+	PQclear(res);
+
+	termPQExpBuffer(&sql);
+}
+
+typedef struct PartitionWorkerData
+{
+	int			thread_id;
+	int			part_start;
+	int			part_end;
+}			PartitionWorkerData;
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+	PartitionWorkerData *data = (PartitionWorkerData *) arg;
+	PGconn	   *con = doConnect();
+	int			p;
+
+	if (con == NULL)
+		pg_fatal("could not create connection for partition worker (parts %d-%d)",
+				 data->part_start, data->part_end);
+
+	executeStatement(con, "begin");
+	createStandalonePartitions(con, data->part_start, data->part_end);
+	for (p = data->part_start; p <= data->part_end; p++)
+	{
+		pg_time_usec_t start = pg_time_now();
+
+		initPopulatePartition(con, p);
+		fprintf(stderr, "partition %d loaded by thread %d (in %.2f s)\n",
+				p, data->thread_id,
+				PG_TIME_GET_DOUBLE(pg_time_now() - start));
+	}
+	executeStatement(con, "commit");
+
+	PQfinish(con);
+	THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+	THREAD_T   *threads;
+	PartitionWorkerData *data;
+	int			parts_per_worker = partitions / nthreads;
+	int			extra_parts = partitions % nthreads;
+	int			next_part = 1;
+	int			i;
+
+	fprintf(stderr, "creating %d partitions...\n", partitions);
+	fprintf(stderr, "loading pgbench_accounts with %d threads...\n", nthreads);
+
+	threads = pg_malloc_array(THREAD_T, nthreads);
+	data = pg_malloc_array(PartitionWorkerData, nthreads);
+
+	for (i = 0; i < nthreads; i++)
+	{
+		data[i].thread_id = i;
+		data[i].part_start = next_part;
+		data[i].part_end = next_part + parts_per_worker - 1 +
+			(i < extra_parts ? 1 : 0);
+		next_part = data[i].part_end + 1;
+
+		errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, &data[i]);
+		if (errno != 0)
+			pg_fatal("could not create thread for worker %d: %m", i);
+	}
+
+	for (i = 0; i < nthreads; i++)
+		THREAD_JOIN(threads[i]);
+
+	free(threads);
+	free(data);
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5155,8 +5348,11 @@ initGenerateDataClientSide(PGconn *con)
 	fprintf(stderr, "generating data (client-side)...\n");
 
 	/*
-	 * we do all of this in one transaction to enable the backend's
-	 * data-loading optimizations
+	 * For serial loading, do everything in one transaction to enable the
+	 * backend's data-loading optimizations.  For parallel loading
+	 * (range-partitioned, -j > 1), load branches and tellers in one
+	 * transaction, then load accounts in parallel with each worker in its own
+	 * transaction.
 	 */
 	executeStatement(con, "begin");
 
@@ -5169,9 +5365,18 @@ initGenerateDataClientSide(PGconn *con)
 	 */
 	initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
 	initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-	initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
 
-	executeStatement(con, "commit");
+	if (partition_method == PART_RANGE && nthreads > 1)
+	{
+		executeStatement(con, "commit");
+		initLoadAccountsParallel();
+		attachStandalonePartitions(con);
+	}
+	else
+	{
+		initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+		executeStatement(con, "commit");
+	}
 }
 
 /*
@@ -6944,7 +7149,6 @@ main(int argc, char **argv)
 				initialization_option_set = true;
 				break;
 			case 'j':			/* jobs */
-				benchmarking_option_set = true;
 				if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX,
 									  &nthreads))
 				{
@@ -7176,7 +7380,7 @@ main(int argc, char **argv)
 	 * optimization; throttle_delay is calculated incorrectly below if some
 	 * threads have no clients assigned to them.)
 	 */
-	if (nthreads > nclients)
+	if (nthreads > nclients && !is_init_mode)
 		nthreads = nclients;
 
 	/*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..29ee28d616 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,35 @@ $node->pgbench(
 # Check data state, after server-side data generation.
 check_data_state($node, 'server-side');
 
+# Test parallel initialization with range partitions (client-side generation).
+# Use -j to control the number of worker threads; partitions must be >= -j.
+$node->pgbench(
+	'-i -j 2 -s 1 --partitions=4 --partition-method=range',
+	0,
+	[qr{^$}],
+	[
+		qr{creating tables},
+		qr{creating 4 partitions},
+		qr{loading pgbench_accounts with 2 threads},
+		qr{partition \d loaded by thread \d \(in \d+\.\d\d s\)},
+		qr{vacuuming},
+		qr{creating primary keys},
+		qr{done in \d+\.\d\d s }
+	],
+	'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
+# Uneven distribution: 5 partitions across 2 threads (3 + 2).
+$node->pgbench(
+	'-i -j 2 -s 1 --partitions=5 --partition-method=range',
+	0,
+	[qr{^$}],
+	[ qr{loading pgbench_accounts with 2 threads}, qr{done in \d+\.\d\d s } ],
+	'pgbench parallel init with uneven partition distribution');
+
+check_data_state($node, 'parallel-range-uneven');
+
 # Run all builtin scripts, for a few transactions each
 $node->pgbench(
 	'--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
-- 
2.39.5 (Apple Git-154)



Attachments:

  [text/plain] v4-0001-pgbench-parallelize-account-loading-for-range-partit.patch (12.4K, 2-v4-0001-pgbench-parallelize-account-loading-for-range-partit.patch)
  download | inline diff:
From 6099f30cf78b0ed8608670ff07b8a71b8cf0d47c Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 3 May 2026 16:42:20 +0100
Subject: [PATCH v4] pgbench: parallelize account loading for range-partitioned
 tables

In init mode with range partitioning, -j > 1 loads pgbench_accounts
in parallel.  Each worker creates its assigned partitions as
standalone tables, populates them with COPY FREEZE, and the main
connection attaches them afterwards.
---
 doc/src/sgml/ref/pgbench.sgml                |   9 +
 src/bin/pgbench/pgbench.c                    | 258 +++++++++++++++++--
 src/bin/pgbench/t/001_pgbench_with_server.pl |  29 +++
 3 files changed, 269 insertions(+), 27 deletions(-)

diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 2e401d1ceb..3594b731cc 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -382,6 +382,11 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
         the scaled number of accounts.
         Default is <literal>0</literal>, meaning no partitioning.
        </para>
+       <para>
+        With <option>-j</option> greater than 1 and
+        <option>--partition-method=range</option>, partitions are
+        loaded in parallel.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -502,6 +507,10 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
         Clients are distributed as evenly as possible among available threads.
         Default is 1.
        </para>
+       <para>
+        In initialization mode (<option>-i</option>), <option>-j</option>
+        sets the number of threads used to load partitions in parallel.
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..aa21b653ce 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4817,6 +4817,34 @@ initDropTables(PGconn *con)
 					 "pgbench_tellers");
 }
 
+static void
+appendAccountsRangeForValues(PQExpBufferData *query, int p)
+{
+	int64		part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
+
+	appendPQExpBufferStr(query, " for values from (");
+	if (p == 1)
+		appendPQExpBufferStr(query, "minvalue");
+	else
+		appendPQExpBuffer(query, INT64_FORMAT, (p - 1) * part_size + 1);
+	appendPQExpBufferStr(query, ") to (");
+	if (p < partitions)
+		appendPQExpBuffer(query, INT64_FORMAT, p * part_size + 1);
+	else
+		appendPQExpBufferStr(query, "maxvalue");
+	appendPQExpBufferChar(query, ')');
+}
+
+static void
+getAccountsPartitionRows(int p, int64 *start_row, int64 *end_row)
+{
+	int64		total_rows = (int64) naccounts * scale;
+	int64		part_size = (total_rows + partitions - 1) / partitions;
+
+	*start_row = (int64) (p - 1) * part_size;
+	*end_row = (p == partitions) ? total_rows : (int64) p * part_size;
+}
+
 /*
  * Create "pgbench_accounts" partitions if needed.
  *
@@ -4839,33 +4867,17 @@ createPartitions(PGconn *con)
 	{
 		if (partition_method == PART_RANGE)
 		{
-			int64		part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
-
-			printfPQExpBuffer(&query,
-							  "create%s table pgbench_accounts_%d\n"
-							  "  partition of pgbench_accounts\n"
-							  "  for values from (",
-							  unlogged_tables ? " unlogged" : "", p);
-
 			/*
 			 * For RANGE, we use open-ended partitions at the beginning and
 			 * end to allow any valid value for the primary key.  Although the
 			 * actual minimum and maximum values can be derived from the
 			 * scale, it is more generic and the performance is better.
 			 */
-			if (p == 1)
-				appendPQExpBufferStr(&query, "minvalue");
-			else
-				appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) * part_size + 1);
-
-			appendPQExpBufferStr(&query, ") to (");
-
-			if (p < partitions)
-				appendPQExpBuffer(&query, INT64_FORMAT, p * part_size + 1);
-			else
-				appendPQExpBufferStr(&query, "maxvalue");
-
-			appendPQExpBufferChar(&query, ')');
+			printfPQExpBuffer(&query,
+							  "create%s table pgbench_accounts_%d\n"
+							  "  partition of pgbench_accounts",
+							  unlogged_tables ? " unlogged" : "", p);
+			appendAccountsRangeForValues(&query, p);
 		}
 		else if (partition_method == PART_HASH)
 			printfPQExpBuffer(&query,
@@ -4889,6 +4901,62 @@ createPartitions(PGconn *con)
 	termPQExpBuffer(&query);
 }
 
+static void
+createStandalonePartitions(PGconn *con, int part_start, int part_end)
+{
+	PQExpBufferData query;
+	const char *aid_type = (scale >= SCALE_32BIT_THRESHOLD) ? "bigint" : "int";
+
+	Assert(partitions > 0);
+	Assert(partition_method == PART_RANGE);
+
+	initPQExpBuffer(&query);
+
+	for (int p = part_start; p <= part_end; p++)
+	{
+		printfPQExpBuffer(&query,
+						  "create%s table pgbench_accounts_%d\n"
+						  "  (aid %s not null,\n"
+						  "   bid int,\n"
+						  "   abalance int,\n"
+						  "   filler character(84))\n"
+						  "  with (fillfactor=%d)",
+						  unlogged_tables ? " unlogged" : "", p,
+						  aid_type, fillfactor);
+
+		executeStatement(con, query.data);
+	}
+
+	termPQExpBuffer(&query);
+}
+
+static void
+attachStandalonePartitions(PGconn *con)
+{
+	PQExpBufferData query;
+
+	Assert(partitions > 0);
+	Assert(partition_method == PART_RANGE);
+
+	initPQExpBuffer(&query);
+
+	executeStatement(con, "begin");
+
+	for (int p = 1; p <= partitions; p++)
+	{
+		printfPQExpBuffer(&query,
+						  "alter table pgbench_accounts\n"
+						  "  attach partition pgbench_accounts_%d",
+						  p);
+		appendAccountsRangeForValues(&query, p);
+		executeStatement(con, query.data);
+	}
+
+	executeStatement(con, "commit");
+
+	termPQExpBuffer(&query);
+}
+
 /*
  * Create pgbench's standard tables
  */
@@ -4981,7 +5049,17 @@ initCreateTables(PGconn *con)
 	termPQExpBuffer(&query);
 
 	if (partition_method != PART_NONE)
+	{
+		/*
+		 * In the parallel range-partitioned case, partitions are created by
+		 * the worker threads (so each one can use COPY FREEZE in its own
+		 * transaction) and attached afterwards.
+		 */
+		if (partition_method == PART_RANGE && nthreads > 1)
+			return;
+
 		createPartitions(con);
+	}
 }
 
 /*
@@ -5143,6 +5221,121 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
 	termPQExpBuffer(&sql);
 }
 
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+	int64		start_row;
+	int64		end_row;
+	char		copy_stmt[256];
+	PGresult   *res;
+	PQExpBufferData sql;
+	int64		row;
+
+	getAccountsPartitionRows(partno, &start_row, &end_row);
+
+	snprintf(copy_stmt, sizeof(copy_stmt),
+			 PQserverVersion(con) >= 140000 ?
+			 "copy pgbench_accounts_%d from stdin with (freeze on)" :
+			 "copy pgbench_accounts_%d from stdin",
+			 partno);
+
+	res = PQexec(con, copy_stmt);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pg_fatal("could not start COPY for partition %d: %s",
+				 partno, PQerrorMessage(con));
+	PQclear(res);
+
+	initPQExpBuffer(&sql);
+
+	for (row = start_row; row < end_row; row++)
+	{
+		initAccount(&sql, row);
+		if (PQputCopyData(con, sql.data, sql.len) <= 0)
+			pg_fatal("PQputCopyData failed for partition %d", partno);
+	}
+
+	if (PQputCopyEnd(con, NULL) <= 0)
+		pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+	res = PQgetResult(con);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("COPY failed for partition %d: %s", partno, PQerrorMessage(con));
+	PQclear(res);
+
+	termPQExpBuffer(&sql);
+}
+
+typedef struct PartitionWorkerData
+{
+	int			thread_id;
+	int			part_start;
+	int			part_end;
+}			PartitionWorkerData;
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+	PartitionWorkerData *data = (PartitionWorkerData *) arg;
+	PGconn	   *con = doConnect();
+	int			p;
+
+	if (con == NULL)
+		pg_fatal("could not create connection for partition worker (parts %d-%d)",
+				 data->part_start, data->part_end);
+
+	executeStatement(con, "begin");
+	createStandalonePartitions(con, data->part_start, data->part_end);
+	for (p = data->part_start; p <= data->part_end; p++)
+	{
+		pg_time_usec_t start = pg_time_now();
+
+		initPopulatePartition(con, p);
+		fprintf(stderr, "partition %d loaded by thread %d (in %.2f s)\n",
+				p, data->thread_id,
+				PG_TIME_GET_DOUBLE(pg_time_now() - start));
+	}
+	executeStatement(con, "commit");
+
+	PQfinish(con);
+	THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+	THREAD_T   *threads;
+	PartitionWorkerData *data;
+	int			parts_per_worker = partitions / nthreads;
+	int			extra_parts = partitions % nthreads;
+	int			next_part = 1;
+	int			i;
+
+	fprintf(stderr, "creating %d partitions...\n", partitions);
+	fprintf(stderr, "loading pgbench_accounts with %d threads...\n", nthreads);
+
+	threads = pg_malloc_array(THREAD_T, nthreads);
+	data = pg_malloc_array(PartitionWorkerData, nthreads);
+
+	for (i = 0; i < nthreads; i++)
+	{
+		data[i].thread_id = i;
+		data[i].part_start = next_part;
+		data[i].part_end = next_part + parts_per_worker - 1 +
+			(i < extra_parts ? 1 : 0);
+		next_part = data[i].part_end + 1;
+
+		errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, &data[i]);
+		if (errno != 0)
+			pg_fatal("could not create thread for worker %d: %m", i);
+	}
+
+	for (i = 0; i < nthreads; i++)
+		THREAD_JOIN(threads[i]);
+
+	free(threads);
+	free(data);
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5155,8 +5348,11 @@ initGenerateDataClientSide(PGconn *con)
 	fprintf(stderr, "generating data (client-side)...\n");
 
 	/*
-	 * we do all of this in one transaction to enable the backend's
-	 * data-loading optimizations
+	 * For serial loading, do everything in one transaction to enable the
+	 * backend's data-loading optimizations.  For parallel loading
+	 * (range-partitioned, -j > 1), load branches and tellers in one
+	 * transaction, then load accounts in parallel with each worker in its own
+	 * transaction.
 	 */
 	executeStatement(con, "begin");
 
@@ -5169,9 +5365,18 @@ initGenerateDataClientSide(PGconn *con)
 	 */
 	initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
 	initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-	initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
 
-	executeStatement(con, "commit");
+	if (partition_method == PART_RANGE && nthreads > 1)
+	{
+		executeStatement(con, "commit");
+		initLoadAccountsParallel();
+		attachStandalonePartitions(con);
+	}
+	else
+	{
+		initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+		executeStatement(con, "commit");
+	}
 }
 
 /*
@@ -6944,7 +7149,6 @@ main(int argc, char **argv)
 				initialization_option_set = true;
 				break;
 			case 'j':			/* jobs */
-				benchmarking_option_set = true;
 				if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX,
 									  &nthreads))
 				{
@@ -7176,7 +7380,7 @@ main(int argc, char **argv)
 	 * optimization; throttle_delay is calculated incorrectly below if some
 	 * threads have no clients assigned to them.)
 	 */
-	if (nthreads > nclients)
+	if (nthreads > nclients && !is_init_mode)
 		nthreads = nclients;
 
 	/*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..29ee28d616 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,35 @@ $node->pgbench(
 # Check data state, after server-side data generation.
 check_data_state($node, 'server-side');
 
+# Test parallel initialization with range partitions (client-side generation).
+# Use -j to control the number of worker threads; partitions must be >= -j.
+$node->pgbench(
+	'-i -j 2 -s 1 --partitions=4 --partition-method=range',
+	0,
+	[qr{^$}],
+	[
+		qr{creating tables},
+		qr{creating 4 partitions},
+		qr{loading pgbench_accounts with 2 threads},
+		qr{partition \d loaded by thread \d \(in \d+\.\d\d s\)},
+		qr{vacuuming},
+		qr{creating primary keys},
+		qr{done in \d+\.\d\d s }
+	],
+	'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
+# Uneven distribution: 5 partitions across 2 threads (3 + 2).
+$node->pgbench(
+	'-i -j 2 -s 1 --partitions=5 --partition-method=range',
+	0,
+	[qr{^$}],
+	[ qr{loading pgbench_accounts with 2 threads}, qr{done in \d+\.\d\d s } ],
+	'pgbench parallel init with uneven partition distribution');
+
+check_data_state($node, 'parallel-range-uneven');
+
 # Run all builtin scripts, for a few transactions each
 $node->pgbench(
 	'--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
-- 
2.39.5 (Apple Git-154)



view thread (11+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: Re: parallel data loading for pgbench -i
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox