public inbox for [email protected]
help / color / mirror / Atom feedFrom: Boris Mironov <[email protected]>
To: [email protected] <[email protected]>
Subject: Idea to enhance pgbench by more modes to generate data (multi-TXNs, UNNEST, COPY BINARY)
Date: Tue, 11 Nov 2025 13:33:16 +0000
Message-ID: <DS0PR08MB9565D91414C65B3AC363825488CFA@DS0PR08MB9565.namprd08.prod.outlook.com> (raw)
Hello hackers,
For some of my specific hardware tests I needed to generate big databases well beyond RAM size. Hence I turned to pgbench tool and its default 2 modes for client- and server-side generation for TPC-B tests. When I use "scale" factor in range of few thousands (eg, 3000 - 5000) data generation phase takes quite some time. I looked at it as opportunity to prove/disprove 2 hypothesises:
*
will INSERT mode work faster if we commit once every "scale" and turn single INSERT into "for" loop with commits for 3 tables in the end of each loop
*
will "INSERT .. SELECT FROM unnest" be faster than "INSERT .. SELECT FROM generate_series"
*
will BINARY mode work faster than TEXT even though we send much more data
*
and so on
As a result of my experiments I produced significant patch for pgbench utility and though that it might be of interest not just for me. Therefore I'm sending draft version of it in diff format for current development tree on GitHub. As of November 11, 2025 I can merge with main branch of the project on GitHub.
Spoiler alert: "COPY FROM BINARY" is significantly faster than current "COPY FROM TEXT"
Would be happy to polish it if there is interest to such change.
Cheers
Attachments:
[application/octet-stream] pgbench.c.diff (29.0K, 3-pgbench.c.diff)
download | inline diff:
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1515ed405ba..71aa1d9479f 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -161,7 +161,7 @@ typedef struct socket_set
* some configurable parameters */
#define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
-#define ALL_INIT_STEPS "dtgGvpf" /* all possible steps */
+#define ALL_INIT_STEPS "dtgCGiIvpf" /* all possible steps */
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
#define DEFAULT_NXACTS 10 /* default nxacts */
@@ -171,6 +171,14 @@ typedef struct socket_set
#define MIN_ZIPFIAN_PARAM 1.001 /* minimum parameter for zipfian */
#define MAX_ZIPFIAN_PARAM 1000.0 /* maximum parameter for zipfian */
+/* original single transaction server-side method */
+#define GEN_TYPE_INSERT_ORIGINAL 'G' /* use INSERT .. SELECT generate_series to generate data */
+/* 'one transaction per scale' server-side methods */
+#define GEN_TYPE_INSERT_SERIES 'i' /* use INSERT .. SELECT generate_series to generate data */
+#define GEN_TYPE_INSERT_UNNEST 'I' /* use INSERT .. SELECT unnest to generate data */
+#define GEN_TYPE_COPY_ORIGINAL 'g' /* use COPY .. FROM STDIN .. TEXT to generate data */
+#define GEN_TYPE_COPY_BINARY 'C' /* use COPY .. FROM STDIN .. BINARY to generate data */
+
static int nxacts = 0; /* number of transactions per client */
static int duration = 0; /* duration in seconds */
static int64 end_time = 0; /* when to stop in micro seconds, under -T */
@@ -181,6 +189,18 @@ static int64 end_time = 0; /* when to stop in micro seconds, under -T */
*/
static int scale = 1;
+/*
+ * mode of data generation to use
+ */
+static char data_generation_type = '?';
+
+/*
+ * COPY FROM BINARY execution buffer
+ */
+#define BIN_COPY_BUF_SIZE 102400 /* maximum buffer size for COPY FROM BINARY */
+static char *bin_copy_buffer = NULL; /* buffer for COPY FROM BINARY */
+static int32_t bin_copy_buffer_length = 0; /* current buffer size */
+
/*
* fillfactor. for example, fillfactor = 90 will use only 90 percent
* space during inserts and leave 10 percent free.
@@ -402,14 +422,15 @@ typedef struct StatsData
* directly successful transactions (they were successfully completed on
* the first try).
*
- * A failed transaction is defined as unsuccessfully retried transactions.
- * It can be one of two types:
- *
- * failed (the number of failed transactions) =
+ * 'failed' (the number of failed transactions) =
* 'serialization_failures' (they got a serialization error and were not
- * successfully retried) +
+ * successfully retried) +
* 'deadlock_failures' (they got a deadlock error and were not
- * successfully retried).
+ * successfully retried) +
+ * 'other_sql_failures' (they failed on the first try or after retries
+ * due to a SQL error other than serialization or
+ * deadlock; they are counted as a failed transaction
+ * only when --continue-on-error is specified).
*
* If the transaction was retried after a serialization or a deadlock
* error this does not guarantee that this retry was successful. Thus
@@ -421,7 +442,7 @@ typedef struct StatsData
*
* 'retried' (number of all retried transactions) =
* successfully retried transactions +
- * failed transactions.
+ * unsuccessful retried transactions.
*----------
*/
int64 cnt; /* number of successful transactions, not
@@ -440,6 +461,11 @@ typedef struct StatsData
int64 deadlock_failures; /* number of transactions that were not
* successfully retried after a deadlock
* error */
+ int64 other_sql_failures; /* number of failed transactions for
+ * reasons other than
+ * serialization/deadlock failure, which
+ * is counted if --continue-on-error is
+ * specified */
SimpleStats latency;
SimpleStats lag;
} StatsData;
@@ -457,6 +483,7 @@ typedef enum EStatus
{
ESTATUS_NO_ERROR = 0,
ESTATUS_META_COMMAND_ERROR,
+ ESTATUS_CONN_ERROR,
/* SQL errors */
ESTATUS_SERIALIZATION_ERROR,
@@ -770,6 +797,7 @@ static int64 total_weight = 0;
static bool verbose_errors = false; /* print verbose messages of all errors */
static bool exit_on_abort = false; /* exit when any client is aborted */
+static bool continue_on_error = false; /* continue after errors */
/* Builtin test scripts */
typedef struct BuiltinScript
@@ -842,7 +870,8 @@ static int wait_on_socket_set(socket_set *sa, int64 usecs);
static bool socket_has_input(socket_set *sa, int fd, int idx);
/* callback used to build rows for COPY during data loading */
-typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
+typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
+typedef void (*initRowMethodBin) (PGconn *con, PGresult *res, int64_t curr, int32_t parent);
/* callback functions for our flex lexer */
static const PsqlScanCallbacks pgbench_callbacks = {
@@ -906,7 +935,10 @@ usage(void)
" d: drop any existing pgbench tables\n"
" t: create the tables used by the standard pgbench scenario\n"
" g: generate data, client-side\n"
- " G: generate data, server-side\n"
+ " C: client-side (single TNX) COPY .. FROM STDIN .. BINARY\n"
+ " G: generate data, server-side in single transaction\n"
+ " i: server-side (multiple TXNs) INSERT .. SELECT generate_series\n"
+ " I: server-side (multiple TXNs) INSERT .. SELECT unnest\n"
" v: invoke VACUUM on the standard tables\n"
" p: create primary key indexes on the standard tables\n"
" f: create foreign keys between the standard tables\n"
@@ -949,6 +981,7 @@ usage(void)
" -T, --time=NUM duration of benchmark test in seconds\n"
" -v, --vacuum-all vacuum all four standard tables before tests\n"
" --aggregate-interval=NUM aggregate data over NUM seconds\n"
+ " --continue-on-error continue running after an SQL error\n"
" --exit-on-abort exit when any client is aborted\n"
" --failures-detailed report the failures grouped by basic types\n"
" --log-prefix=PREFIX prefix for transaction time log file\n"
@@ -1467,6 +1500,7 @@ initStats(StatsData *sd, pg_time_usec_t start)
sd->retried = 0;
sd->serialization_failures = 0;
sd->deadlock_failures = 0;
+ sd->other_sql_failures = 0;
initSimpleStats(&sd->latency);
initSimpleStats(&sd->lag);
}
@@ -1516,6 +1550,9 @@ accumStats(StatsData *stats, bool skipped, double lat, double lag,
case ESTATUS_DEADLOCK_ERROR:
stats->deadlock_failures++;
break;
+ case ESTATUS_OTHER_SQL_ERROR:
+ stats->other_sql_failures++;
+ break;
default:
/* internal error which should never occur */
pg_fatal("unexpected error status: %d", estatus);
@@ -3231,11 +3268,43 @@ sendCommand(CState *st, Command *command)
}
/*
- * Get the error status from the error code.
+ * Read and discard all available results from the connection.
+ */
+static void
+discardAvailableResults(CState *st)
+{
+ PGresult *res = NULL;
+
+ for (;;)
+ {
+ res = PQgetResult(st->con);
+
+ /*
+ * Read and discard results until PQgetResult() returns NULL (no more
+ * results) or a connection failure is detected. If the pipeline
+ * status is PQ_PIPELINE_ABORTED, more results may still be available
+ * even after PQgetResult() returns NULL, so continue reading in that
+ * case.
+ */
+ if ((res == NULL && PQpipelineStatus(st->con) != PQ_PIPELINE_ABORTED) ||
+ PQstatus(st->con) == CONNECTION_BAD)
+ break;
+
+ PQclear(res);
+ }
+ PQclear(res);
+}
+
+/*
+ * Determine the error status based on the connection status and error code.
*/
static EStatus
-getSQLErrorStatus(const char *sqlState)
+getSQLErrorStatus(CState *st, const char *sqlState)
{
+ discardAvailableResults(st);
+ if (PQstatus(st->con) == CONNECTION_BAD)
+ return ESTATUS_CONN_ERROR;
+
if (sqlState != NULL)
{
if (strcmp(sqlState, ERRCODE_T_R_SERIALIZATION_FAILURE) == 0)
@@ -3257,6 +3326,17 @@ canRetryError(EStatus estatus)
estatus == ESTATUS_DEADLOCK_ERROR);
}
+/*
+ * Returns true if --continue-on-error is specified and this error allows
+ * processing to continue.
+ */
+static bool
+canContinueOnError(EStatus estatus)
+{
+ return (continue_on_error &&
+ estatus == ESTATUS_OTHER_SQL_ERROR);
+}
+
/*
* Process query response from the backend.
*
@@ -3375,9 +3455,9 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
- st->estatus = getSQLErrorStatus(PQresultErrorField(res,
- PG_DIAG_SQLSTATE));
- if (canRetryError(st->estatus))
+ st->estatus = getSQLErrorStatus(st, PQresultErrorField(res,
+ PG_DIAG_SQLSTATE));
+ if (canRetryError(st->estatus) || canContinueOnError(st->estatus))
{
if (verbose_errors)
commandError(st, PQresultErrorMessage(res));
@@ -3409,11 +3489,7 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
error:
PQclear(res);
PQclear(next_res);
- do
- {
- res = PQgetResult(st->con);
- PQclear(res);
- } while (res);
+ discardAvailableResults(st);
return false;
}
@@ -4041,7 +4117,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
st->state = CSTATE_END_COMMAND;
}
- else if (canRetryError(st->estatus))
+ else if (canRetryError(st->estatus) || canContinueOnError(st->estatus))
st->state = CSTATE_ERROR;
else
st->state = CSTATE_ABORTED;
@@ -4562,7 +4638,8 @@ static int64
getFailures(const StatsData *stats)
{
return (stats->serialization_failures +
- stats->deadlock_failures);
+ stats->deadlock_failures +
+ stats->other_sql_failures);
}
/*
@@ -4582,6 +4659,8 @@ getResultString(bool skipped, EStatus estatus)
return "serialization";
case ESTATUS_DEADLOCK_ERROR:
return "deadlock";
+ case ESTATUS_OTHER_SQL_ERROR:
+ return "other";
default:
/* internal error which should never occur */
pg_fatal("unexpected error status: %d", estatus);
@@ -4637,6 +4716,7 @@ doLog(TState *thread, CState *st,
int64 skipped = 0;
int64 serialization_failures = 0;
int64 deadlock_failures = 0;
+ int64 other_sql_failures = 0;
int64 retried = 0;
int64 retries = 0;
@@ -4677,10 +4757,12 @@ doLog(TState *thread, CState *st,
{
serialization_failures = agg->serialization_failures;
deadlock_failures = agg->deadlock_failures;
+ other_sql_failures = agg->other_sql_failures;
}
- fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT,
+ fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT " " INT64_FORMAT,
serialization_failures,
- deadlock_failures);
+ deadlock_failures,
+ other_sql_failures);
fputc('\n', logfile);
@@ -5120,9 +5202,9 @@ initPopulateTable(PGconn *con, const char *table, int64 base,
* a blank-padded string in pgbench_accounts.
*/
static void
-initGenerateDataClientSide(PGconn *con)
+initGenerateDataClientSideText(PGconn *con)
{
- fprintf(stderr, "generating data (client-side)...\n");
+ fprintf(stderr, "TEXT mode...\n");
/*
* we do all of this in one transaction to enable the backend's
@@ -5138,25 +5220,384 @@ initGenerateDataClientSide(PGconn *con)
* already exist
*/
initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
- initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
+ initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
executeStatement(con, "commit");
}
+
/*
- * Fill the standard tables with some data generated on the server
- *
- * As already the case with the client-side data generation, the filler
- * column defaults to NULL in pgbench_branches and pgbench_tellers,
- * and is a blank-padded string in pgbench_accounts.
+ * Dumps binary buffer to file (purely for debugging)
*/
static void
-initGenerateDataServerSide(PGconn *con)
+dumpBufferToFile(char *filename)
+{
+ FILE *file_ptr;
+ size_t bytes_written;
+
+ file_ptr = fopen(filename, "wb");
+ if (file_ptr == NULL)
+ {
+ fprintf(stderr, "Error opening file %s\n", filename);
+ return; // EXIT_FAILURE;
+ }
+
+ bytes_written = fwrite(bin_copy_buffer, 1, bin_copy_buffer_length, file_ptr);
+
+ if (bytes_written != bin_copy_buffer_length)
+ {
+ fprintf(stderr, "Error writing to file or incomplete write\n");
+ fclose(file_ptr);
+ return; // EXIT_FAILURE;
+ }
+
+ fclose(file_ptr);
+}
+
+/*
+ * Save char data to buffer
+ */
+static void
+bufferCharData(char *src, int32_t len)
+{
+ memcpy((char *) bin_copy_buffer + bin_copy_buffer_length, (char *) src, len);
+ bin_copy_buffer_length += len;
+}
+
+/*
+ * Converts platform byte order into network byte order
+ * SPARC doesn't reqire that
+ */
+static void
+bufferData(void *src, int32_t len)
+{
+#ifdef __sparc__
+ bufferCharData(src, len);
+#else
+ if (len == 1)
+ bufferCharData(src, len);
+ else
+ for (int32_t i = 0; i < len; i++)
+ {
+ ((char *) bin_copy_buffer + bin_copy_buffer_length)[i] =
+ ((char *) src)[len - i - 1];
+ }
+
+ bin_copy_buffer_length += len;
+#endif
+}
+
+/*
+ * adds column counter
+ */
+static void
+addColumnCounter(int16_t n)
+{
+ bufferData((void *) &n, sizeof(n));
+}
+
+/*
+ * adds column with NULL value
+ */
+static void
+addNullColumn()
+{
+ int32_t null = -1;
+ bufferData((void *) &null, sizeof(null));
+}
+
+/*
+ * adds column with int8 value
+ */
+static void
+addInt8Column(int8_t value)
+{
+ int8_t data = value;
+ int32_t size = sizeof(data);
+ bufferData((void *) &size, sizeof(size));
+ bufferData((void *) &data, sizeof(data));
+}
+
+/*
+ * adds column with int16 value
+ */
+static void
+addInt16Column(int16_t value)
+{
+ int16_t data = value;
+ int32_t size = sizeof(data);
+ bufferData((void *) &size, sizeof(size));
+ bufferData((void *) &data, sizeof(data));
+}
+
+/*
+ * adds column with inti32 value
+ */
+static void
+addInt32Column(int32_t value)
+{
+ int32_t data = value;
+ int32_t size = sizeof(data);
+ bufferData((void *) &size, sizeof(size));
+ bufferData((void *) &data, sizeof(data));
+}
+
+/*
+ * adds column with inti64 value
+ */
+static void
+addInt64Column(int64_t value)
+{
+ int64_t data = value;
+ int32_t size = sizeof(data);
+ bufferData((void *) &size, sizeof(size));
+ bufferData((void *) &data, sizeof(data));
+}
+
+/*
+ * adds column with char value
+ */
+static void
+addCharColumn(char *value)
+{
+ int32_t size = strlen(value);
+ bufferData((void *) &size, sizeof(size));
+ bufferCharData(value, size);
+}
+
+/*
+ * Starts communication with server for COPY FROM BINARY statement
+ */
+static void
+sendBinaryCopyHeader(PGconn *con)
+{
+ char header[] = {'P','G','C','O','P','Y','\n','\377','\r','\n','\0',
+ '\0','\0','\0','\0',
+ '\0','\0','\0','\0' };
+
+ PQputCopyData(con, header, 19);
+}
+
+/*
+ * Finishes communication with server for COPY FROM BINARY statement
+ */
+static void
+sendBinaryCopyTrailer(PGconn *con)
+{
+ static char trailer[] = { 0xFF, 0xFF };
+
+ PQputCopyData(con, trailer, 2);
+}
+
+/*
+ * Flashes current buffer over network if needed
+ */
+static void
+flushBuffer(PGconn *con, PGresult *res, int16_t row_len)
+{
+ if (bin_copy_buffer_length + row_len > BIN_COPY_BUF_SIZE)
+ {
+ /* flush current buffer */
+ if (PQresultStatus(res) == PGRES_COPY_IN)
+ PQputCopyData(con, (char *) bin_copy_buffer, bin_copy_buffer_length);
+ bin_copy_buffer_length = 0;
+ }
+}
+
+/*
+ * Sends current branch row to buffer
+ */
+static void
+initBranchBinary(PGconn *con, PGresult *res, int64_t curr, int32_t parent)
+{
+ /*
+ * Each row has following extra bytes:
+ * - 2 bytes for number of columns
+ * - 4 bytes as length for each column
+ */
+ int16_t max_row_len = 35 + 2 + 4*3; /* max row size is 32 */
+
+ flushBuffer(con, res, max_row_len);
+
+ addColumnCounter(2);
+
+ addInt32Column(curr + 1);
+ addInt32Column(0);
+}
+
+/*
+ * Sends current teller row to buffer
+ */
+static void
+initTellerBinary(PGconn *con, PGresult *res, int64_t curr, int32_t parent)
+{
+ /*
+ * Each row has following extra bytes:
+ * - 2 bytes for number of columns
+ * - 4 bytes as length for each column
+ */
+ int16_t max_row_len = 40 + 2 + 4*4; /* max row size is 40 */
+
+ flushBuffer(con, res, max_row_len);
+
+ addColumnCounter(3);
+
+ addInt32Column(curr + 1);
+ addInt32Column(curr / parent + 1);
+ addInt32Column(0);
+}
+
+/*
+ * Sends current account row to buffer
+ */
+static void
+initAccountBinary(PGconn *con, PGresult *res, int64_t curr, int32_t parent)
+{
+ /*
+ * Each row has following extra bytes:
+ * - 2 bytes for number of columns
+ * - 4 bytes as length for each column
+ */
+ int16_t max_row_len = 250 + 2 + 4*4; /* max row size is 250 for int64 */
+
+ flushBuffer(con, res, max_row_len);
+
+ addColumnCounter(3);
+
+ if (scale <= SCALE_32BIT_THRESHOLD)
+ addInt32Column(curr + 1);
+ else
+ addInt64Column(curr);
+
+ addInt32Column(curr / parent + 1);
+ addInt32Column(0);
+}
+
+/*
+ * Universal wrapper for sending data in binary format
+ */
+static void
+initPopulateTableBinary(PGconn *con, char *table, char *columns,
+ int64_t base, initRowMethodBin init_row)
+{
+ int n;
+ PGresult *res;
+ char copy_statement[256];
+ const char *copy_statement_fmt = "copy %s (%s) from stdin (format binary)";
+ int64_t total = base * scale;
+
+ bin_copy_buffer_length = 0;
+
+ /* Use COPY with FREEZE on v14 and later for all ordinary tables */
+ if ((PQserverVersion(con) >= 140000) &&
+ get_table_relkind(con, table) == RELKIND_RELATION)
+ copy_statement_fmt = "copy %s (%s) from stdin with (format binary, freeze on)";
+
+ n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table, columns);
+ if (n >= sizeof(copy_statement))
+ pg_fatal("invalid buffer size: must be at least %d characters long", n);
+ else if (n == -1)
+ pg_fatal("invalid format string");
+
+ res = PQexec(con, copy_statement);
+
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("unexpected copy in result: %s", PQerrorMessage(con));
+ PQclear(res);
+
+
+ sendBinaryCopyHeader(con);
+
+ for (int64_t i = 0; i < total; i++)
+ {
+ init_row(con, res, i, base);
+ }
+
+ if (PQresultStatus(res) == PGRES_COPY_IN)
+ PQputCopyData(con, (char *) bin_copy_buffer, bin_copy_buffer_length);
+ else
+ fprintf(stderr, "Unexpected mode %d instead of %d\n", PQresultStatus(res), PGRES_COPY_IN);
+
+ sendBinaryCopyTrailer(con);
+
+ if (PQresultStatus(res) == PGRES_COPY_IN)
+ {
+ if (PQputCopyEnd(con, NULL) == 1) /* success */
+ {
+ res = PQgetResult(con);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ fprintf(stderr, "Error: %s\n", PQerrorMessage(con));
+ PQclear(res);
+ }
+ else
+ fprintf(stderr, "Error: %s\n", PQerrorMessage(con));
+ }
+}
+
+/*
+ * Wrapper for binary data load
+ */
+static void
+initGenerateDataClientSideBinary(PGconn *con)
+{
+
+ fprintf(stderr, "BINARY mode...\n");
+
+ bin_copy_buffer = pg_malloc(BIN_COPY_BUF_SIZE);
+ bin_copy_buffer_length = 0;
+
+ /*
+ * we do all of this in one transaction to enable the backend's
+ * data-loading optimizations
+ */
+ executeStatement(con, "begin");
+
+ /* truncate away any old data */
+ initTruncateTables(con);
+
+ initPopulateTableBinary(con, "pgbench_branches", "bid, bbalance",
+ nbranches, initBranchBinary);
+ initPopulateTableBinary(con, "pgbench_tellers", "tid, bid, tbalance",
+ ntellers, initTellerBinary);
+ initPopulateTableBinary(con, "pgbench_accounts", "aid, bid, abalance",
+ naccounts, initAccountBinary);
+
+ executeStatement(con, "commit");
+
+ pg_free(bin_copy_buffer);
+}
+
+/*
+ * Fill the standard tables with some data generated and sent from the client.
+ */
+static void
+initGenerateDataClientSide(PGconn *con)
+{
+ fprintf(stderr, "generating data (client-side) in ");
+
+ switch (data_generation_type)
+ {
+ case GEN_TYPE_COPY_ORIGINAL:
+ initGenerateDataClientSideText(con);
+ break;
+ case GEN_TYPE_COPY_BINARY:
+ initGenerateDataClientSideBinary(con);
+ break;
+ }
+}
+
+/*
+ * Generating data via INSERT .. SELECT .. FROM generate_series
+ * whole dataset in single transaction
+ */
+static void
+generateDataInsertSingleTXN(PGconn *con)
{
PQExpBufferData sql;
- fprintf(stderr, "generating data (server-side)...\n");
+ fprintf(stderr, "via INSERT .. SELECT generate_series... in single TXN\n");
+
/*
* we do all of this in one transaction to enable the backend's
@@ -5170,27 +5611,166 @@ initGenerateDataServerSide(PGconn *con)
initPQExpBuffer(&sql);
printfPQExpBuffer(&sql,
- "insert into pgbench_branches(bid,bbalance) "
+ "insert into pgbench_branches(bid, bbalance) "
"select bid, 0 "
- "from generate_series(1, %d) as bid", nbranches * scale);
+ "from generate_series(1, %d) as bid",
+ scale * nbranches);
executeStatement(con, sql.data);
printfPQExpBuffer(&sql,
- "insert into pgbench_tellers(tid,bid,tbalance) "
- "select tid, (tid - 1) / %d + 1, 0 "
- "from generate_series(1, %d) as tid", ntellers, ntellers * scale);
+ "insert into pgbench_tellers(tid, bid, tbalance) "
+ "select tid + 1, tid / %d + 1, 0 "
+ "from generate_series(0, %d) as tid",
+ ntellers, (scale * ntellers) - 1);
executeStatement(con, sql.data);
printfPQExpBuffer(&sql,
- "insert into pgbench_accounts(aid,bid,abalance,filler) "
- "select aid, (aid - 1) / %d + 1, 0, '' "
- "from generate_series(1, " INT64_FORMAT ") as aid",
- naccounts, (int64) naccounts * scale);
+ "insert into pgbench_accounts(aid, bid, abalance, "
+ "filler) "
+ "select aid + 1, aid / %d + 1, 0, '' "
+ "from generate_series(0, " INT64_FORMAT ") as aid",
+ naccounts, (int64) (scale * naccounts) - 1);
executeStatement(con, sql.data);
+ executeStatement(con, "commit");
+
termPQExpBuffer(&sql);
+}
+
+
+/*
+ * Generating data via INSERT .. SELECT .. FROM generate_series
+ * One transaction per 'scale'
+ */
+static void
+generateDataInsertSeries(PGconn *con)
+{
+ PQExpBufferData sql;
+
+ fprintf(stderr, "via INSERT .. SELECT generate_series... in multiple TXN(s)\n");
+
+ initPQExpBuffer(&sql);
+
+ executeStatement(con, "begin");
+
+ /* truncate away any old data */
+ initTruncateTables(con);
+
+ executeStatement(con, "commit");
+
+ for (int i = 0; i < scale; i++)
+ {
+ executeStatement(con, "begin");
+
+ printfPQExpBuffer(&sql,
+ "insert into pgbench_branches(bid, bbalance) "
+ "values(%d, 0)", i + 1);
+ executeStatement(con, sql.data);
+
+ printfPQExpBuffer(&sql,
+ "insert into pgbench_tellers(tid, bid, tbalance) "
+ "select tid + 1, tid / %d + 1, 0 "
+ "from generate_series(%d, %d) as tid",
+ ntellers, i * ntellers, (i + 1) * ntellers - 1);
+ executeStatement(con, sql.data);
+
+ printfPQExpBuffer(&sql,
+ "insert into pgbench_accounts(aid, bid, abalance, "
+ "filler) "
+ "select aid + 1, aid / %d + 1, 0, '' "
+ "from generate_series(" INT64_FORMAT ", "
+ INT64_FORMAT ") as aid",
+ naccounts, (int64) i * naccounts,
+ (int64) (i + 1) * naccounts - 1);
+ executeStatement(con, sql.data);
+
+ executeStatement(con, "commit");
+ }
+
+ termPQExpBuffer(&sql);
+}
+
+/*
+ * Generating data via INSERT .. SELECT .. FROM unnest
+ * One transaction per 'scale'
+ */
+static void
+generateDataInsertUnnest(PGconn *con)
+{
+ PQExpBufferData sql;
+
+ fprintf(stderr, "via INSERT .. SELECT unnest...\n");
+
+ initPQExpBuffer(&sql);
+
+ executeStatement(con, "begin");
+
+ /* truncate away any old data */
+ initTruncateTables(con);
executeStatement(con, "commit");
+
+ for (int s = 0; s < scale; s++)
+ {
+ executeStatement(con, "begin");
+
+ printfPQExpBuffer(&sql,
+ "insert into pgbench_branches(bid,bbalance) "
+ "values(%d, 0)", s + 1);
+ executeStatement(con, sql.data);
+
+ printfPQExpBuffer(&sql,
+ "insert into pgbench_tellers(tid, bid, tbalance) "
+ "select unnest(array_agg(s.i order by s.i)) as tid, "
+ "%d as bid, 0 as tbalance "
+ "from generate_series(%d, %d) as s(i)",
+ s + 1, s * ntellers + 1, (s + 1) * ntellers);
+ executeStatement(con, sql.data);
+
+ printfPQExpBuffer(&sql,
+ "with data as ("
+ " select generate_series(" INT64_FORMAT ", "
+ INT64_FORMAT ") as i) "
+ "insert into pgbench_accounts(aid, bid, "
+ "abalance, filler) "
+ "select unnest(aid), unnest(bid), 0 as abalance, "
+ "'' as filler "
+ "from (select array_agg(i+1) aid, "
+ "array_agg(i/%d + 1) bid from data)",
+ (int64) s * naccounts + 1,
+ (int64) (s + 1) * naccounts, naccounts);
+ executeStatement(con, sql.data);
+
+ executeStatement(con, "commit");
+ }
+
+ termPQExpBuffer(&sql);
+}
+
+/*
+ * Fill the standard tables with some data generated on the server
+ *
+ * As already the case with the client-side data generation, the filler
+ * column defaults to NULL in pgbench_branches and pgbench_tellers,
+ * and is a blank-padded string in pgbench_accounts.
+ */
+static void
+initGenerateDataServerSide(PGconn *con)
+{
+ fprintf(stderr, "generating data (server-side) ");
+
+ switch (data_generation_type)
+ {
+ case GEN_TYPE_INSERT_ORIGINAL:
+ generateDataInsertSingleTXN(con);
+ break;
+ case GEN_TYPE_INSERT_SERIES:
+ generateDataInsertSeries(con);
+ break;
+ case GEN_TYPE_INSERT_UNNEST:
+ generateDataInsertUnnest(con);
+ break;
+ }
}
/*
@@ -5276,6 +5856,8 @@ initCreateFKeys(PGconn *con)
static void
checkInitSteps(const char *initialize_steps)
{
+ char data_init_type = 0;
+
if (initialize_steps[0] == '\0')
pg_fatal("no initialization steps specified");
@@ -5287,7 +5869,22 @@ checkInitSteps(const char *initialize_steps)
pg_log_error_detail("Allowed step characters are: \"" ALL_INIT_STEPS "\".");
exit(1);
}
+
+ switch (*step)
+ {
+ case 'g':
+ case 'C':
+ case 'G':
+ case 'i':
+ case 'I':
+ data_init_type++;
+ data_generation_type = *step;
+ break;
+ }
}
+
+ if (data_init_type > 1)
+ pg_log_error("WARNING! More than one type of server-side data generation is requested");
}
/*
@@ -5326,10 +5923,13 @@ runInitSteps(const char *initialize_steps)
initCreateTables(con);
break;
case 'g':
+ case 'C':
op = "client-side generate";
initGenerateDataClientSide(con);
break;
case 'G':
+ case 'i':
+ case 'I':
op = "server-side generate";
initGenerateDataServerSide(con);
break;
@@ -6319,6 +6919,7 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
cur.serialization_failures +=
threads[i].stats.serialization_failures;
cur.deadlock_failures += threads[i].stats.deadlock_failures;
+ cur.other_sql_failures += threads[i].stats.other_sql_failures;
}
/* we count only actually executed transactions */
@@ -6461,7 +7062,8 @@ printResults(StatsData *total,
/*
* Remaining stats are nonsensical if we failed to execute any xacts due
- * to others than serialization or deadlock errors
+ * to other than serialization or deadlock errors and --continue-on-error
+ * is not set.
*/
if (total_cnt <= 0)
return;
@@ -6477,6 +7079,9 @@ printResults(StatsData *total,
printf("number of deadlock failures: " INT64_FORMAT " (%.3f%%)\n",
total->deadlock_failures,
100.0 * total->deadlock_failures / total_cnt);
+ printf("number of other failures: " INT64_FORMAT " (%.3f%%)\n",
+ total->other_sql_failures,
+ 100.0 * total->other_sql_failures / total_cnt);
}
/* it can be non-zero only if max_tries is not equal to one */
@@ -6580,6 +7185,10 @@ printResults(StatsData *total,
sstats->deadlock_failures,
(100.0 * sstats->deadlock_failures /
script_total_cnt));
+ printf(" - number of other failures: " INT64_FORMAT " (%.3f%%)\n",
+ sstats->other_sql_failures,
+ (100.0 * sstats->other_sql_failures /
+ script_total_cnt));
}
/*
@@ -6739,6 +7348,7 @@ main(int argc, char **argv)
{"verbose-errors", no_argument, NULL, 15},
{"exit-on-abort", no_argument, NULL, 16},
{"debug", no_argument, NULL, 17},
+ {"continue-on-error", no_argument, NULL, 18},
{NULL, 0, NULL, 0}
};
@@ -7092,6 +7702,10 @@ main(int argc, char **argv)
case 17: /* debug */
pg_logging_increase_verbosity();
break;
+ case 18: /* continue-on-error */
+ benchmarking_option_set = true;
+ continue_on_error = true;
+ break;
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -7447,6 +8061,7 @@ main(int argc, char **argv)
stats.retried += thread->stats.retried;
stats.serialization_failures += thread->stats.serialization_failures;
stats.deadlock_failures += thread->stats.deadlock_failures;
+ stats.other_sql_failures += thread->stats.other_sql_failures;
latency_late += thread->latency_late;
conn_total_duration += thread->conn_duration;
view thread (20+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: Idea to enhance pgbench by more modes to generate data (multi-TXNs, UNNEST, COPY BINARY)
In-Reply-To: <DS0PR08MB9565D91414C65B3AC363825488CFA@DS0PR08MB9565.namprd08.prod.outlook.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox