public inbox for [email protected]
help / color / mirror / Atom feedFrom: Mircea Cadariu <[email protected]>
To: lakshmi <[email protected]>
To: Hayato Kuroda (Fujitsu) <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Cc: [email protected] <[email protected]>
Cc: Heikki Linnakangas <[email protected]>
Subject: Re: parallel data loading for pgbench -i
Date: Fri, 8 May 2026 19:11:46 +0100
Message-ID: <[email protected]> (raw)
In-Reply-To: <OS9PR01MB12149E0E2CAAB378D264A35B1F5242@OS9PR01MB12149.jpnprd01.prod.outlook.com>
References: <[email protected]>
<CAEvyyTj0rEsgcQOQgkARbRPbupHR_mc=TUzHBBLNzd8JByUUTw@mail.gmail.com>
<[email protected]>
<OS9PR01MB12149A6CEE200E1A5A88D9F14F563A@OS9PR01MB12149.jpnprd01.prod.outlook.com>
<CAEvyyTiQqd=rv3XUxc0YEaW-feopksBveZKKjVZNeSVG=GrY+A@mail.gmail.com>
<TYRPR01MB121560B291DA3CD262CC7A09AF568A@TYRPR01MB12156.jpnprd01.prod.outlook.com>
<CAEvyyTjPWfvJLn3c_G_zLRffZ3=YqzMYj6c5znaNxpHyZAg3XQ@mail.gmail.com>
<[email protected]>
<CAEvyyTircZ-tHgap=J6Aog0CBgXp4Dqx6dHYyK1iqgfoT+8D_A@mail.gmail.com>
<[email protected]>
<[email protected]>
<OS9PR01MB12149E0E2CAAB378D264A35B1F5242@OS9PR01MB12149.jpnprd01.prod.outlook.com>
Hi Lakshmi and Hayato,
Thanks a lot for your feedback.
Attached for your consideration is v4, in which I address your remarks.
--
Thanks,
Mircea Cadariu
From 6099f30cf78b0ed8608670ff07b8a71b8cf0d47c Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 3 May 2026 16:42:20 +0100
Subject: [PATCH v4] pgbench: parallelize account loading for range-partitioned
tables
In init mode with range partitioning, -j > 1 loads pgbench_accounts
in parallel. Each worker creates its assigned partitions as
standalone tables, populates them with COPY FREEZE, and the main
connection attaches them afterwards.
---
doc/src/sgml/ref/pgbench.sgml | 9 +
src/bin/pgbench/pgbench.c | 258 +++++++++++++++++--
src/bin/pgbench/t/001_pgbench_with_server.pl | 29 +++
3 files changed, 269 insertions(+), 27 deletions(-)
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 2e401d1ceb..3594b731cc 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -382,6 +382,11 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
the scaled number of accounts.
Default is <literal>0</literal>, meaning no partitioning.
</para>
+ <para>
+ With <option>-j</option> greater than 1 and
+ <option>--partition-method=range</option>, partitions are
+ loaded in parallel.
+ </para>
</listitem>
</varlistentry>
@@ -502,6 +507,10 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
Clients are distributed as evenly as possible among available threads.
Default is 1.
</para>
+ <para>
+ In initialization mode (<option>-i</option>), <option>-j</option>
+ sets the number of threads used to load partitions in parallel.
+ </para>
</listitem>
</varlistentry>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..aa21b653ce 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4817,6 +4817,34 @@ initDropTables(PGconn *con)
"pgbench_tellers");
}
+static void
+appendAccountsRangeForValues(PQExpBufferData *query, int p)
+{
+ int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
+
+ appendPQExpBufferStr(query, " for values from (");
+ if (p == 1)
+ appendPQExpBufferStr(query, "minvalue");
+ else
+ appendPQExpBuffer(query, INT64_FORMAT, (p - 1) * part_size + 1);
+ appendPQExpBufferStr(query, ") to (");
+ if (p < partitions)
+ appendPQExpBuffer(query, INT64_FORMAT, p * part_size + 1);
+ else
+ appendPQExpBufferStr(query, "maxvalue");
+ appendPQExpBufferChar(query, ')');
+}
+
+static void
+getAccountsPartitionRows(int p, int64 *start_row, int64 *end_row)
+{
+ int64 total_rows = (int64) naccounts * scale;
+ int64 part_size = (total_rows + partitions - 1) / partitions;
+
+ *start_row = (int64) (p - 1) * part_size;
+ *end_row = (p == partitions) ? total_rows : (int64) p * part_size;
+}
+
/*
* Create "pgbench_accounts" partitions if needed.
*
@@ -4839,33 +4867,17 @@ createPartitions(PGconn *con)
{
if (partition_method == PART_RANGE)
{
- int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
-
- printfPQExpBuffer(&query,
- "create%s table pgbench_accounts_%d\n"
- " partition of pgbench_accounts\n"
- " for values from (",
- unlogged_tables ? " unlogged" : "", p);
-
/*
* For RANGE, we use open-ended partitions at the beginning and
* end to allow any valid value for the primary key. Although the
* actual minimum and maximum values can be derived from the
* scale, it is more generic and the performance is better.
*/
- if (p == 1)
- appendPQExpBufferStr(&query, "minvalue");
- else
- appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) * part_size + 1);
-
- appendPQExpBufferStr(&query, ") to (");
-
- if (p < partitions)
- appendPQExpBuffer(&query, INT64_FORMAT, p * part_size + 1);
- else
- appendPQExpBufferStr(&query, "maxvalue");
-
- appendPQExpBufferChar(&query, ')');
+ printfPQExpBuffer(&query,
+ "create%s table pgbench_accounts_%d\n"
+ " partition of pgbench_accounts",
+ unlogged_tables ? " unlogged" : "", p);
+ appendAccountsRangeForValues(&query, p);
}
else if (partition_method == PART_HASH)
printfPQExpBuffer(&query,
@@ -4889,6 +4901,62 @@ createPartitions(PGconn *con)
termPQExpBuffer(&query);
}
+static void
+createStandalonePartitions(PGconn *con, int part_start, int part_end)
+{
+ PQExpBufferData query;
+ const char *aid_type = (scale >= SCALE_32BIT_THRESHOLD) ? "bigint" : "int";
+
+ Assert(partitions > 0);
+ Assert(partition_method == PART_RANGE);
+
+ initPQExpBuffer(&query);
+
+ for (int p = part_start; p <= part_end; p++)
+ {
+ printfPQExpBuffer(&query,
+ "create%s table pgbench_accounts_%d\n"
+ " (aid %s not null,\n"
+ " bid int,\n"
+ " abalance int,\n"
+ " filler character(84))\n"
+ " with (fillfactor=%d)",
+ unlogged_tables ? " unlogged" : "", p,
+ aid_type, fillfactor);
+
+ executeStatement(con, query.data);
+ }
+
+ termPQExpBuffer(&query);
+}
+
+static void
+attachStandalonePartitions(PGconn *con)
+{
+ PQExpBufferData query;
+
+ Assert(partitions > 0);
+ Assert(partition_method == PART_RANGE);
+
+ initPQExpBuffer(&query);
+
+ executeStatement(con, "begin");
+
+ for (int p = 1; p <= partitions; p++)
+ {
+ printfPQExpBuffer(&query,
+ "alter table pgbench_accounts\n"
+ " attach partition pgbench_accounts_%d",
+ p);
+ appendAccountsRangeForValues(&query, p);
+ executeStatement(con, query.data);
+ }
+
+ executeStatement(con, "commit");
+
+ termPQExpBuffer(&query);
+}
+
/*
* Create pgbench's standard tables
*/
@@ -4981,7 +5049,17 @@ initCreateTables(PGconn *con)
termPQExpBuffer(&query);
if (partition_method != PART_NONE)
+ {
+ /*
+ * In the parallel range-partitioned case, partitions are created by
+ * the worker threads (so each one can use COPY FREEZE in its own
+ * transaction) and attached afterwards.
+ */
+ if (partition_method == PART_RANGE && nthreads > 1)
+ return;
+
createPartitions(con);
+ }
}
/*
@@ -5143,6 +5221,121 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
termPQExpBuffer(&sql);
}
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+ int64 start_row;
+ int64 end_row;
+ char copy_stmt[256];
+ PGresult *res;
+ PQExpBufferData sql;
+ int64 row;
+
+ getAccountsPartitionRows(partno, &start_row, &end_row);
+
+ snprintf(copy_stmt, sizeof(copy_stmt),
+ PQserverVersion(con) >= 140000 ?
+ "copy pgbench_accounts_%d from stdin with (freeze on)" :
+ "copy pgbench_accounts_%d from stdin",
+ partno);
+
+ res = PQexec(con, copy_stmt);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("could not start COPY for partition %d: %s",
+ partno, PQerrorMessage(con));
+ PQclear(res);
+
+ initPQExpBuffer(&sql);
+
+ for (row = start_row; row < end_row; row++)
+ {
+ initAccount(&sql, row);
+ if (PQputCopyData(con, sql.data, sql.len) <= 0)
+ pg_fatal("PQputCopyData failed for partition %d", partno);
+ }
+
+ if (PQputCopyEnd(con, NULL) <= 0)
+ pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+ res = PQgetResult(con);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("COPY failed for partition %d: %s", partno, PQerrorMessage(con));
+ PQclear(res);
+
+ termPQExpBuffer(&sql);
+}
+
+typedef struct PartitionWorkerData
+{
+ int thread_id;
+ int part_start;
+ int part_end;
+} PartitionWorkerData;
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+ PartitionWorkerData *data = (PartitionWorkerData *) arg;
+ PGconn *con = doConnect();
+ int p;
+
+ if (con == NULL)
+ pg_fatal("could not create connection for partition worker (parts %d-%d)",
+ data->part_start, data->part_end);
+
+ executeStatement(con, "begin");
+ createStandalonePartitions(con, data->part_start, data->part_end);
+ for (p = data->part_start; p <= data->part_end; p++)
+ {
+ pg_time_usec_t start = pg_time_now();
+
+ initPopulatePartition(con, p);
+ fprintf(stderr, "partition %d loaded by thread %d (in %.2f s)\n",
+ p, data->thread_id,
+ PG_TIME_GET_DOUBLE(pg_time_now() - start));
+ }
+ executeStatement(con, "commit");
+
+ PQfinish(con);
+ THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+ THREAD_T *threads;
+ PartitionWorkerData *data;
+ int parts_per_worker = partitions / nthreads;
+ int extra_parts = partitions % nthreads;
+ int next_part = 1;
+ int i;
+
+ fprintf(stderr, "creating %d partitions...\n", partitions);
+ fprintf(stderr, "loading pgbench_accounts with %d threads...\n", nthreads);
+
+ threads = pg_malloc_array(THREAD_T, nthreads);
+ data = pg_malloc_array(PartitionWorkerData, nthreads);
+
+ for (i = 0; i < nthreads; i++)
+ {
+ data[i].thread_id = i;
+ data[i].part_start = next_part;
+ data[i].part_end = next_part + parts_per_worker - 1 +
+ (i < extra_parts ? 1 : 0);
+ next_part = data[i].part_end + 1;
+
+ errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, &data[i]);
+ if (errno != 0)
+ pg_fatal("could not create thread for worker %d: %m", i);
+ }
+
+ for (i = 0; i < nthreads; i++)
+ THREAD_JOIN(threads[i]);
+
+ free(threads);
+ free(data);
+}
+
/*
* Fill the standard tables with some data generated and sent from the client.
*
@@ -5155,8 +5348,11 @@ initGenerateDataClientSide(PGconn *con)
fprintf(stderr, "generating data (client-side)...\n");
/*
- * we do all of this in one transaction to enable the backend's
- * data-loading optimizations
+ * For serial loading, do everything in one transaction to enable the
+ * backend's data-loading optimizations. For parallel loading
+ * (range-partitioned, -j > 1), load branches and tellers in one
+ * transaction, then load accounts in parallel with each worker in its own
+ * transaction.
*/
executeStatement(con, "begin");
@@ -5169,9 +5365,18 @@ initGenerateDataClientSide(PGconn *con)
*/
initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
- initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
- executeStatement(con, "commit");
+ if (partition_method == PART_RANGE && nthreads > 1)
+ {
+ executeStatement(con, "commit");
+ initLoadAccountsParallel();
+ attachStandalonePartitions(con);
+ }
+ else
+ {
+ initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+ executeStatement(con, "commit");
+ }
}
/*
@@ -6944,7 +7149,6 @@ main(int argc, char **argv)
initialization_option_set = true;
break;
case 'j': /* jobs */
- benchmarking_option_set = true;
if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX,
&nthreads))
{
@@ -7176,7 +7380,7 @@ main(int argc, char **argv)
* optimization; throttle_delay is calculated incorrectly below if some
* threads have no clients assigned to them.)
*/
- if (nthreads > nclients)
+ if (nthreads > nclients && !is_init_mode)
nthreads = nclients;
/*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..29ee28d616 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,35 @@ $node->pgbench(
# Check data state, after server-side data generation.
check_data_state($node, 'server-side');
+# Test parallel initialization with range partitions (client-side generation).
+# Use -j to control the number of worker threads; partitions must be >= -j.
+$node->pgbench(
+ '-i -j 2 -s 1 --partitions=4 --partition-method=range',
+ 0,
+ [qr{^$}],
+ [
+ qr{creating tables},
+ qr{creating 4 partitions},
+ qr{loading pgbench_accounts with 2 threads},
+ qr{partition \d loaded by thread \d \(in \d+\.\d\d s\)},
+ qr{vacuuming},
+ qr{creating primary keys},
+ qr{done in \d+\.\d\d s }
+ ],
+ 'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
+# Uneven distribution: 5 partitions across 2 threads (3 + 2).
+$node->pgbench(
+ '-i -j 2 -s 1 --partitions=5 --partition-method=range',
+ 0,
+ [qr{^$}],
+ [ qr{loading pgbench_accounts with 2 threads}, qr{done in \d+\.\d\d s } ],
+ 'pgbench parallel init with uneven partition distribution');
+
+check_data_state($node, 'parallel-range-uneven');
+
# Run all builtin scripts, for a few transactions each
$node->pgbench(
'--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
--
2.39.5 (Apple Git-154)
Attachments:
[text/plain] v4-0001-pgbench-parallelize-account-loading-for-range-partit.patch (12.4K, 2-v4-0001-pgbench-parallelize-account-loading-for-range-partit.patch)
download | inline diff:
From 6099f30cf78b0ed8608670ff07b8a71b8cf0d47c Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 3 May 2026 16:42:20 +0100
Subject: [PATCH v4] pgbench: parallelize account loading for range-partitioned
tables
In init mode with range partitioning, -j > 1 loads pgbench_accounts
in parallel. Each worker creates its assigned partitions as
standalone tables, populates them with COPY FREEZE, and the main
connection attaches them afterwards.
---
doc/src/sgml/ref/pgbench.sgml | 9 +
src/bin/pgbench/pgbench.c | 258 +++++++++++++++++--
src/bin/pgbench/t/001_pgbench_with_server.pl | 29 +++
3 files changed, 269 insertions(+), 27 deletions(-)
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 2e401d1ceb..3594b731cc 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -382,6 +382,11 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
the scaled number of accounts.
Default is <literal>0</literal>, meaning no partitioning.
</para>
+ <para>
+ With <option>-j</option> greater than 1 and
+ <option>--partition-method=range</option>, partitions are
+ loaded in parallel.
+ </para>
</listitem>
</varlistentry>
@@ -502,6 +507,10 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
Clients are distributed as evenly as possible among available threads.
Default is 1.
</para>
+ <para>
+ In initialization mode (<option>-i</option>), <option>-j</option>
+ sets the number of threads used to load partitions in parallel.
+ </para>
</listitem>
</varlistentry>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..aa21b653ce 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4817,6 +4817,34 @@ initDropTables(PGconn *con)
"pgbench_tellers");
}
+static void
+appendAccountsRangeForValues(PQExpBufferData *query, int p)
+{
+ int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
+
+ appendPQExpBufferStr(query, " for values from (");
+ if (p == 1)
+ appendPQExpBufferStr(query, "minvalue");
+ else
+ appendPQExpBuffer(query, INT64_FORMAT, (p - 1) * part_size + 1);
+ appendPQExpBufferStr(query, ") to (");
+ if (p < partitions)
+ appendPQExpBuffer(query, INT64_FORMAT, p * part_size + 1);
+ else
+ appendPQExpBufferStr(query, "maxvalue");
+ appendPQExpBufferChar(query, ')');
+}
+
+static void
+getAccountsPartitionRows(int p, int64 *start_row, int64 *end_row)
+{
+ int64 total_rows = (int64) naccounts * scale;
+ int64 part_size = (total_rows + partitions - 1) / partitions;
+
+ *start_row = (int64) (p - 1) * part_size;
+ *end_row = (p == partitions) ? total_rows : (int64) p * part_size;
+}
+
/*
* Create "pgbench_accounts" partitions if needed.
*
@@ -4839,33 +4867,17 @@ createPartitions(PGconn *con)
{
if (partition_method == PART_RANGE)
{
- int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
-
- printfPQExpBuffer(&query,
- "create%s table pgbench_accounts_%d\n"
- " partition of pgbench_accounts\n"
- " for values from (",
- unlogged_tables ? " unlogged" : "", p);
-
/*
* For RANGE, we use open-ended partitions at the beginning and
* end to allow any valid value for the primary key. Although the
* actual minimum and maximum values can be derived from the
* scale, it is more generic and the performance is better.
*/
- if (p == 1)
- appendPQExpBufferStr(&query, "minvalue");
- else
- appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) * part_size + 1);
-
- appendPQExpBufferStr(&query, ") to (");
-
- if (p < partitions)
- appendPQExpBuffer(&query, INT64_FORMAT, p * part_size + 1);
- else
- appendPQExpBufferStr(&query, "maxvalue");
-
- appendPQExpBufferChar(&query, ')');
+ printfPQExpBuffer(&query,
+ "create%s table pgbench_accounts_%d\n"
+ " partition of pgbench_accounts",
+ unlogged_tables ? " unlogged" : "", p);
+ appendAccountsRangeForValues(&query, p);
}
else if (partition_method == PART_HASH)
printfPQExpBuffer(&query,
@@ -4889,6 +4901,62 @@ createPartitions(PGconn *con)
termPQExpBuffer(&query);
}
+static void
+createStandalonePartitions(PGconn *con, int part_start, int part_end)
+{
+ PQExpBufferData query;
+ const char *aid_type = (scale >= SCALE_32BIT_THRESHOLD) ? "bigint" : "int";
+
+ Assert(partitions > 0);
+ Assert(partition_method == PART_RANGE);
+
+ initPQExpBuffer(&query);
+
+ for (int p = part_start; p <= part_end; p++)
+ {
+ printfPQExpBuffer(&query,
+ "create%s table pgbench_accounts_%d\n"
+ " (aid %s not null,\n"
+ " bid int,\n"
+ " abalance int,\n"
+ " filler character(84))\n"
+ " with (fillfactor=%d)",
+ unlogged_tables ? " unlogged" : "", p,
+ aid_type, fillfactor);
+
+ executeStatement(con, query.data);
+ }
+
+ termPQExpBuffer(&query);
+}
+
+static void
+attachStandalonePartitions(PGconn *con)
+{
+ PQExpBufferData query;
+
+ Assert(partitions > 0);
+ Assert(partition_method == PART_RANGE);
+
+ initPQExpBuffer(&query);
+
+ executeStatement(con, "begin");
+
+ for (int p = 1; p <= partitions; p++)
+ {
+ printfPQExpBuffer(&query,
+ "alter table pgbench_accounts\n"
+ " attach partition pgbench_accounts_%d",
+ p);
+ appendAccountsRangeForValues(&query, p);
+ executeStatement(con, query.data);
+ }
+
+ executeStatement(con, "commit");
+
+ termPQExpBuffer(&query);
+}
+
/*
* Create pgbench's standard tables
*/
@@ -4981,7 +5049,17 @@ initCreateTables(PGconn *con)
termPQExpBuffer(&query);
if (partition_method != PART_NONE)
+ {
+ /*
+ * In the parallel range-partitioned case, partitions are created by
+ * the worker threads (so each one can use COPY FREEZE in its own
+ * transaction) and attached afterwards.
+ */
+ if (partition_method == PART_RANGE && nthreads > 1)
+ return;
+
createPartitions(con);
+ }
}
/*
@@ -5143,6 +5221,121 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
termPQExpBuffer(&sql);
}
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+ int64 start_row;
+ int64 end_row;
+ char copy_stmt[256];
+ PGresult *res;
+ PQExpBufferData sql;
+ int64 row;
+
+ getAccountsPartitionRows(partno, &start_row, &end_row);
+
+ snprintf(copy_stmt, sizeof(copy_stmt),
+ PQserverVersion(con) >= 140000 ?
+ "copy pgbench_accounts_%d from stdin with (freeze on)" :
+ "copy pgbench_accounts_%d from stdin",
+ partno);
+
+ res = PQexec(con, copy_stmt);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("could not start COPY for partition %d: %s",
+ partno, PQerrorMessage(con));
+ PQclear(res);
+
+ initPQExpBuffer(&sql);
+
+ for (row = start_row; row < end_row; row++)
+ {
+ initAccount(&sql, row);
+ if (PQputCopyData(con, sql.data, sql.len) <= 0)
+ pg_fatal("PQputCopyData failed for partition %d", partno);
+ }
+
+ if (PQputCopyEnd(con, NULL) <= 0)
+ pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+ res = PQgetResult(con);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("COPY failed for partition %d: %s", partno, PQerrorMessage(con));
+ PQclear(res);
+
+ termPQExpBuffer(&sql);
+}
+
+typedef struct PartitionWorkerData
+{
+ int thread_id;
+ int part_start;
+ int part_end;
+} PartitionWorkerData;
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+ PartitionWorkerData *data = (PartitionWorkerData *) arg;
+ PGconn *con = doConnect();
+ int p;
+
+ if (con == NULL)
+ pg_fatal("could not create connection for partition worker (parts %d-%d)",
+ data->part_start, data->part_end);
+
+ executeStatement(con, "begin");
+ createStandalonePartitions(con, data->part_start, data->part_end);
+ for (p = data->part_start; p <= data->part_end; p++)
+ {
+ pg_time_usec_t start = pg_time_now();
+
+ initPopulatePartition(con, p);
+ fprintf(stderr, "partition %d loaded by thread %d (in %.2f s)\n",
+ p, data->thread_id,
+ PG_TIME_GET_DOUBLE(pg_time_now() - start));
+ }
+ executeStatement(con, "commit");
+
+ PQfinish(con);
+ THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+ THREAD_T *threads;
+ PartitionWorkerData *data;
+ int parts_per_worker = partitions / nthreads;
+ int extra_parts = partitions % nthreads;
+ int next_part = 1;
+ int i;
+
+ fprintf(stderr, "creating %d partitions...\n", partitions);
+ fprintf(stderr, "loading pgbench_accounts with %d threads...\n", nthreads);
+
+ threads = pg_malloc_array(THREAD_T, nthreads);
+ data = pg_malloc_array(PartitionWorkerData, nthreads);
+
+ for (i = 0; i < nthreads; i++)
+ {
+ data[i].thread_id = i;
+ data[i].part_start = next_part;
+ data[i].part_end = next_part + parts_per_worker - 1 +
+ (i < extra_parts ? 1 : 0);
+ next_part = data[i].part_end + 1;
+
+ errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, &data[i]);
+ if (errno != 0)
+ pg_fatal("could not create thread for worker %d: %m", i);
+ }
+
+ for (i = 0; i < nthreads; i++)
+ THREAD_JOIN(threads[i]);
+
+ free(threads);
+ free(data);
+}
+
/*
* Fill the standard tables with some data generated and sent from the client.
*
@@ -5155,8 +5348,11 @@ initGenerateDataClientSide(PGconn *con)
fprintf(stderr, "generating data (client-side)...\n");
/*
- * we do all of this in one transaction to enable the backend's
- * data-loading optimizations
+ * For serial loading, do everything in one transaction to enable the
+ * backend's data-loading optimizations. For parallel loading
+ * (range-partitioned, -j > 1), load branches and tellers in one
+ * transaction, then load accounts in parallel with each worker in its own
+ * transaction.
*/
executeStatement(con, "begin");
@@ -5169,9 +5365,18 @@ initGenerateDataClientSide(PGconn *con)
*/
initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
- initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
- executeStatement(con, "commit");
+ if (partition_method == PART_RANGE && nthreads > 1)
+ {
+ executeStatement(con, "commit");
+ initLoadAccountsParallel();
+ attachStandalonePartitions(con);
+ }
+ else
+ {
+ initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+ executeStatement(con, "commit");
+ }
}
/*
@@ -6944,7 +7149,6 @@ main(int argc, char **argv)
initialization_option_set = true;
break;
case 'j': /* jobs */
- benchmarking_option_set = true;
if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX,
&nthreads))
{
@@ -7176,7 +7380,7 @@ main(int argc, char **argv)
* optimization; throttle_delay is calculated incorrectly below if some
* threads have no clients assigned to them.)
*/
- if (nthreads > nclients)
+ if (nthreads > nclients && !is_init_mode)
nthreads = nclients;
/*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..29ee28d616 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,35 @@ $node->pgbench(
# Check data state, after server-side data generation.
check_data_state($node, 'server-side');
+# Test parallel initialization with range partitions (client-side generation).
+# Use -j to control the number of worker threads; partitions must be >= -j.
+$node->pgbench(
+ '-i -j 2 -s 1 --partitions=4 --partition-method=range',
+ 0,
+ [qr{^$}],
+ [
+ qr{creating tables},
+ qr{creating 4 partitions},
+ qr{loading pgbench_accounts with 2 threads},
+ qr{partition \d loaded by thread \d \(in \d+\.\d\d s\)},
+ qr{vacuuming},
+ qr{creating primary keys},
+ qr{done in \d+\.\d\d s }
+ ],
+ 'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
+# Uneven distribution: 5 partitions across 2 threads (3 + 2).
+$node->pgbench(
+ '-i -j 2 -s 1 --partitions=5 --partition-method=range',
+ 0,
+ [qr{^$}],
+ [ qr{loading pgbench_accounts with 2 threads}, qr{done in \d+\.\d\d s } ],
+ 'pgbench parallel init with uneven partition distribution');
+
+check_data_state($node, 'parallel-range-uneven');
+
# Run all builtin scripts, for a few transactions each
$node->pgbench(
'--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
--
2.39.5 (Apple Git-154)
view thread (11+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: parallel data loading for pgbench -i
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox