public inbox for [email protected]  
help / color / mirror / Atom feed
From: Mircea Cadariu <[email protected]>
To: Heikki Linnakangas <[email protected]>
To: lakshmi <[email protected]>
Cc: Hayato Kuroda (Fujitsu) <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Cc: [email protected] <[email protected]>
Subject: Re: parallel data loading for pgbench -i
Date: Fri, 10 Apr 2026 19:37:09 +0100
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
	<CAEvyyTj0rEsgcQOQgkARbRPbupHR_mc=TUzHBBLNzd8JByUUTw@mail.gmail.com>
	<[email protected]>
	<OS9PR01MB12149A6CEE200E1A5A88D9F14F563A@OS9PR01MB12149.jpnprd01.prod.outlook.com>
	<CAEvyyTiQqd=rv3XUxc0YEaW-feopksBveZKKjVZNeSVG=GrY+A@mail.gmail.com>
	<TYRPR01MB121560B291DA3CD262CC7A09AF568A@TYRPR01MB12156.jpnprd01.prod.outlook.com>
	<CAEvyyTjPWfvJLn3c_G_zLRffZ3=YqzMYj6c5znaNxpHyZAg3XQ@mail.gmail.com>
	<[email protected]>
	<CAEvyyTircZ-tHgap=J6Aog0CBgXp4Dqx6dHYyK1iqgfoT+8D_A@mail.gmail.com>
	<[email protected]>

Hi,

On 07/04/2026 10:00, Heikki Linnakangas wrote:
>
> This all makes more sense in the partitioned case. Perhaps we should 
> parallelize only when partitioned are used, and use only one thread 
> per partition.
>
Thanks for having a look. I attached v3 that parallelizes only the 
partitioned case, one thread per partition. Results:


patch:

pgbench -i -s 100 --partitions 10

done in 12.63 s (drop tables 0.05 s, create tables 0.01 s, client-side 
generate 5.98 s, vacuum 1.63 s, primary keys 4.96 s).


master:

pgbench -i -s 100 --partitions 10

done in 29.29 s (drop tables 0.00 s, create tables 0.02 s, client-side 
generate 16.31 s, vacuum 7.78 s, primary keys 5.18 s).

-- 
Thanks,
Mircea Cadariu

From dd4f3e2d7dbae6b008157f4928287056fd0a82b9 Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Wed, 8 Apr 2026 15:35:31 +0100
Subject: [PATCH] pgbench: parallelize account loading for range-partitioned
 tables

When initializing with range partitioning, spawn one worker thread per
partition to load pgbench_accounts in parallel.  Each worker opens its
own connection, truncates its partition within a transaction, and loads
its rows using COPY FREEZE, which avoids a separate freeze pass during
the subsequent vacuum step.

Non-partitioned and hash-partitioned tables are unaffected and continue
to use serial loading.
---
 src/bin/pgbench/pgbench.c                    | 120 ++++++++++++++++++-
 src/bin/pgbench/t/001_pgbench_with_server.pl |  18 +++
 2 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..f537d46393 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -5143,6 +5143,106 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
 	termPQExpBuffer(&sql);
 }
 
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+	int64		total_rows = (int64) naccounts * scale;
+	int64		part_size = (total_rows + partitions - 1) / partitions;
+	int64		start_row = (int64) (partno - 1) * part_size;
+	int64		end_row = (partno == partitions) ? total_rows : (int64) partno * part_size;
+	char		table_name[NAMEDATALEN];
+	char		truncate_stmt[256];
+	char		copy_stmt[256];
+	int			n;
+	PGresult   *res;
+	PQExpBufferData sql;
+	int64		row;
+
+	snprintf(table_name, sizeof(table_name), "pgbench_accounts_%d", partno);
+	snprintf(truncate_stmt, sizeof(truncate_stmt), "truncate %s", table_name);
+
+	if (PQserverVersion(con) >= 140000)
+		n = pg_snprintf(copy_stmt, sizeof(copy_stmt),
+						"copy %s from stdin with (freeze on)", table_name);
+	else
+		n = pg_snprintf(copy_stmt, sizeof(copy_stmt),
+						"copy %s from stdin", table_name);
+
+	if (n >= sizeof(copy_stmt))
+		pg_fatal("invalid buffer size: must be at least %d characters long", n);
+
+	executeStatement(con, truncate_stmt);
+
+	res = PQexec(con, copy_stmt);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pg_fatal("could not start COPY for partition %d: %s",
+				 partno, PQerrorMessage(con));
+	PQclear(res);
+
+	initPQExpBuffer(&sql);
+
+	for (row = start_row; row < end_row; row++)
+	{
+		initAccount(&sql, row);
+		if (PQputCopyData(con, sql.data, sql.len) <= 0)
+			pg_fatal("PQputCopyData failed for partition %d", partno);
+	}
+
+	if (PQputCopyEnd(con, NULL) <= 0)
+		pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+	res = PQgetResult(con);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("COPY failed for partition %d: %s", partno, PQerrorMessage(con));
+	PQclear(res);
+
+	termPQExpBuffer(&sql);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+	int			partno = *(int *) arg;
+	PGconn	   *con = doConnect();
+
+	if (con == NULL)
+		pg_fatal("could not create connection for partition worker %d", partno);
+
+	executeStatement(con, "begin");
+	initPopulatePartition(con, partno);
+	executeStatement(con, "commit");
+
+	PQfinish(con);
+	THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+	THREAD_T   *threads;
+	int		   *partno;
+	int			i;
+
+	fprintf(stderr, "loading pgbench_accounts with %d threads...\n", partitions);
+
+	threads = pg_malloc_array(THREAD_T, partitions);
+	partno = pg_malloc_array(int, partitions);
+
+	for (i = 0; i < partitions; i++)
+	{
+		partno[i] = i + 1;
+		errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, &partno[i]);
+		if (errno != 0)
+			pg_fatal("could not create thread for partition %d: %m", i + 1);
+	}
+
+	for (i = 0; i < partitions; i++)
+		THREAD_JOIN(threads[i]);
+
+	free(threads);
+	free(partno);
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5155,8 +5255,11 @@ initGenerateDataClientSide(PGconn *con)
 	fprintf(stderr, "generating data (client-side)...\n");
 
 	/*
-	 * we do all of this in one transaction to enable the backend's
-	 * data-loading optimizations
+	 * For the non-partitioned and hash-partitioned cases, do everything in
+	 * one transaction to enable the backend's data-loading optimizations. For
+	 * range-partitioned tables, branches and tellers are loaded in one
+	 * transaction, then accounts are loaded in parallel with one thread per
+	 * partition, each in its own transaction.
 	 */
 	executeStatement(con, "begin");
 
@@ -5169,9 +5272,18 @@ initGenerateDataClientSide(PGconn *con)
 	 */
 	initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
 	initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-	initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
 
-	executeStatement(con, "commit");
+	if (partition_method == PART_RANGE)
+	{
+		executeStatement(con, "commit");
+		initLoadAccountsParallel();
+	}
+	else
+	{
+		/* hash partitioning and non-partitioned tables use serial loading */
+		initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+		executeStatement(con, "commit");
+	}
 }
 
 /*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..b59c181c2a 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,24 @@ $node->pgbench(
 # Check data state, after server-side data generation.
 check_data_state($node, 'server-side');
 
+# Test parallel initialization with range partitions (client-side generation).
+# One thread per partition is spawned automatically.
+$node->pgbench(
+	'-i -s 1 --partitions=4 --partition-method=range',
+	0,
+	[qr{^$}],
+	[
+		qr{creating tables},
+		qr{creating 4 partitions},
+		qr{loading pgbench_accounts with 4 threads},
+		qr{vacuuming},
+		qr{creating primary keys},
+		qr{done in \d+\.\d\d s }
+	],
+	'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
 # Run all builtin scripts, for a few transactions each
 $node->pgbench(
 	'--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
-- 
2.39.5 (Apple Git-154)



Attachments:

  [text/plain] v3-0001-pgbench-parallelize-account-loading-for-range-partit.patch (6.1K, 2-v3-0001-pgbench-parallelize-account-loading-for-range-partit.patch)
  download | inline diff:
From dd4f3e2d7dbae6b008157f4928287056fd0a82b9 Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Wed, 8 Apr 2026 15:35:31 +0100
Subject: [PATCH] pgbench: parallelize account loading for range-partitioned
 tables

When initializing with range partitioning, spawn one worker thread per
partition to load pgbench_accounts in parallel.  Each worker opens its
own connection, truncates its partition within a transaction, and loads
its rows using COPY FREEZE, which avoids a separate freeze pass during
the subsequent vacuum step.

Non-partitioned and hash-partitioned tables are unaffected and continue
to use serial loading.
---
 src/bin/pgbench/pgbench.c                    | 120 ++++++++++++++++++-
 src/bin/pgbench/t/001_pgbench_with_server.pl |  18 +++
 2 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..f537d46393 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -5143,6 +5143,106 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
 	termPQExpBuffer(&sql);
 }
 
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+	int64		total_rows = (int64) naccounts * scale;
+	int64		part_size = (total_rows + partitions - 1) / partitions;
+	int64		start_row = (int64) (partno - 1) * part_size;
+	int64		end_row = (partno == partitions) ? total_rows : (int64) partno * part_size;
+	char		table_name[NAMEDATALEN];
+	char		truncate_stmt[256];
+	char		copy_stmt[256];
+	int			n;
+	PGresult   *res;
+	PQExpBufferData sql;
+	int64		row;
+
+	snprintf(table_name, sizeof(table_name), "pgbench_accounts_%d", partno);
+	snprintf(truncate_stmt, sizeof(truncate_stmt), "truncate %s", table_name);
+
+	if (PQserverVersion(con) >= 140000)
+		n = pg_snprintf(copy_stmt, sizeof(copy_stmt),
+						"copy %s from stdin with (freeze on)", table_name);
+	else
+		n = pg_snprintf(copy_stmt, sizeof(copy_stmt),
+						"copy %s from stdin", table_name);
+
+	if (n >= sizeof(copy_stmt))
+		pg_fatal("invalid buffer size: must be at least %d characters long", n);
+
+	executeStatement(con, truncate_stmt);
+
+	res = PQexec(con, copy_stmt);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pg_fatal("could not start COPY for partition %d: %s",
+				 partno, PQerrorMessage(con));
+	PQclear(res);
+
+	initPQExpBuffer(&sql);
+
+	for (row = start_row; row < end_row; row++)
+	{
+		initAccount(&sql, row);
+		if (PQputCopyData(con, sql.data, sql.len) <= 0)
+			pg_fatal("PQputCopyData failed for partition %d", partno);
+	}
+
+	if (PQputCopyEnd(con, NULL) <= 0)
+		pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+	res = PQgetResult(con);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("COPY failed for partition %d: %s", partno, PQerrorMessage(con));
+	PQclear(res);
+
+	termPQExpBuffer(&sql);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+	int			partno = *(int *) arg;
+	PGconn	   *con = doConnect();
+
+	if (con == NULL)
+		pg_fatal("could not create connection for partition worker %d", partno);
+
+	executeStatement(con, "begin");
+	initPopulatePartition(con, partno);
+	executeStatement(con, "commit");
+
+	PQfinish(con);
+	THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+	THREAD_T   *threads;
+	int		   *partno;
+	int			i;
+
+	fprintf(stderr, "loading pgbench_accounts with %d threads...\n", partitions);
+
+	threads = pg_malloc_array(THREAD_T, partitions);
+	partno = pg_malloc_array(int, partitions);
+
+	for (i = 0; i < partitions; i++)
+	{
+		partno[i] = i + 1;
+		errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, &partno[i]);
+		if (errno != 0)
+			pg_fatal("could not create thread for partition %d: %m", i + 1);
+	}
+
+	for (i = 0; i < partitions; i++)
+		THREAD_JOIN(threads[i]);
+
+	free(threads);
+	free(partno);
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5155,8 +5255,11 @@ initGenerateDataClientSide(PGconn *con)
 	fprintf(stderr, "generating data (client-side)...\n");
 
 	/*
-	 * we do all of this in one transaction to enable the backend's
-	 * data-loading optimizations
+	 * For the non-partitioned and hash-partitioned cases, do everything in
+	 * one transaction to enable the backend's data-loading optimizations. For
+	 * range-partitioned tables, branches and tellers are loaded in one
+	 * transaction, then accounts are loaded in parallel with one thread per
+	 * partition, each in its own transaction.
 	 */
 	executeStatement(con, "begin");
 
@@ -5169,9 +5272,18 @@ initGenerateDataClientSide(PGconn *con)
 	 */
 	initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
 	initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-	initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
 
-	executeStatement(con, "commit");
+	if (partition_method == PART_RANGE)
+	{
+		executeStatement(con, "commit");
+		initLoadAccountsParallel();
+	}
+	else
+	{
+		/* hash partitioning and non-partitioned tables use serial loading */
+		initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+		executeStatement(con, "commit");
+	}
 }
 
 /*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..b59c181c2a 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,24 @@ $node->pgbench(
 # Check data state, after server-side data generation.
 check_data_state($node, 'server-side');
 
+# Test parallel initialization with range partitions (client-side generation).
+# One thread per partition is spawned automatically.
+$node->pgbench(
+	'-i -s 1 --partitions=4 --partition-method=range',
+	0,
+	[qr{^$}],
+	[
+		qr{creating tables},
+		qr{creating 4 partitions},
+		qr{loading pgbench_accounts with 4 threads},
+		qr{vacuuming},
+		qr{creating primary keys},
+		qr{done in \d+\.\d\d s }
+	],
+	'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
 # Run all builtin scripts, for a few transactions each
 $node->pgbench(
 	'--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
-- 
2.39.5 (Apple Git-154)



view thread (11+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: Re: parallel data loading for pgbench -i
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox