From d9e4f5096ae9530b629629fb47ec08b922805739 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Fri, 6 Mar 2026 10:31:35 +0100 Subject: [PATCH v12] Adding new init modes to pgbench including COPY FROM BINARY as well as populating data in multiple transactions --- doc/src/sgml/ref/pgbench.sgml | 90 ++- src/bin/pgbench/pgbench.c | 802 ++++++++++++++++--- src/bin/pgbench/t/001_pgbench_with_server.pl | 236 +++++- 3 files changed, 978 insertions(+), 150 deletions(-) diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 2e401d1ceb8..20f41f332a5 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -217,41 +217,93 @@ pgbench options d - - g or G (Generate data, client-side or server-side) + + + g or c (Generate data, client-side) - Generate data and load it into the standard tables, - replacing any data already present. - - - With g (client-side data generation), - data is generated in pgbench client and then - sent to the server. This uses the client/server bandwidth + Generate data and load it into the standard tables using client-side + generation. Data is generated in the pgbench client + and then sent to the server. This uses the client/server bandwidth extensively through a COPY. pgbench uses the option to load data into ordinary (non-partition) tables with version 14 or later of PostgreSQL to speed up - subsequent VACUUM. - Using g causes logging to - print one message every 100,000 rows while generating data for all - tables. + subsequent VACUUM. Using g + or c causes logging to print one message + every 100,000 rows while generating data for all tables. + + + With g (text format), the client sends data in + COPY ... FROM STDIN (FORMAT TEXT). + This is default mode for data initialization. + + + With c (binary format), the client sends data using + COPY ... FROM STDIN (FORMAT BINARY). Binary transfer + can be faster than text, but may be less portable across different + versions or platforms. - With G (server-side data generation), + The default initialization behavior uses client-side data generation + with text format (equivalent to g). + + + + + + G or U (Generate data, server-side) + + + With G or U (server-side data generation), only small queries are sent from the pgbench client and then data is actually generated in the server. No significant bandwidth is required for this variant, but the server will do more work. - Using G causes logging not to print any progress - message while generating data. + Using G or U causes logging not to print + any progress message while generating data. + + + With G, the server uses INSERT ... + SELECT generate_series(...) to produce the rows. This method + is simple and widely used. + + + With U, the server uses INSERT ... + SELECT unnest(...) to expand arrays of generated values. This can + be more efficient when multiple columns are generated together for column-based + table, as the work is done in a single scan. + + + + + + S or M (Transaction mode for initialization) + + + Specifies whether the initialization steps are performed in a single + transaction or in multiple transactions. + + + With S (single transaction), all data generation + are executed within one transaction. This is the default + behavior. Using a single transaction allows the use of the + option with client-side COPY. - The default initialization behavior uses client-side data - generation (equivalent to g). + With M (multiple transactions), data for each scale factor + unit (e.g., for each increment of ) is loaded in a + separate transaction. For example, if you set + data will be added to three main tables in 20 separate transactions. + In this mode, the option + cannot be used with client-side COPY. + Using multiple transactions can speed up data generation by + committing data in smaller batches, which may reduce the overhead of a + single huge transaction, but also adds per-transaction commit costs. + v (Vacuum) @@ -260,6 +312,7 @@ pgbench options d + p (create Primary keys) @@ -268,6 +321,7 @@ pgbench options d + f (create Foreign keys) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1dae918cc09..48f2950e245 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -159,18 +159,36 @@ typedef struct socket_set /******************************************************************** * some configurable parameters */ - #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */ -#define ALL_INIT_STEPS "dtgGvpf" /* all possible steps */ +#define ALL_INIT_STEPS "dtMScgGUvpf" /* all possible steps */ #define LOG_STEP_SECONDS 5 /* seconds between log messages */ -#define DEFAULT_NXACTS 10 /* default nxacts */ +#define DEFAULT_NXACTS 10 /* default nxacts */ #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */ #define MIN_ZIPFIAN_PARAM 1.001 /* minimum parameter for zipfian */ #define MAX_ZIPFIAN_PARAM 1000.0 /* maximum parameter for zipfian */ +/* server-side methods to generate data */ +#define INIT_STEP_GEN_TYPE_INSERT_SERIES 'G' /* use INSERT .. SELECT + * generate_series to generate + * data */ +#define INIT_STEP_GEN_TYPE_INSERT_UNNEST 'U' /* use INSERT .. SELECT unnest + * to generate data */ +/* client-side methods to generate data */ +#define INIT_STEP_GEN_TYPE_COPY_TEXT 'g' /* use COPY .. FROM STDIN .. + * TEXT to generate data */ +#define INIT_STEP_GEN_TYPE_COPY_BINARY 'c' /* use COPY .. FROM STDIN .. + * BINARY to generate data */ +/* data init pseudo steps */ +#define INIT_STEP_GEN_TYPE_SINGLE_XACT 'S' /* switch to init data as + * single transaction */ +#define INIT_STEP_GEN_TYPE_MULTI_XACT 'M' /* switch to init data as + * multiple transactions */ + +static bool multi_xact = false; /* init data type (as single or multiple + * transactions) */ static int nxacts = 0; /* number of transactions per client */ static int duration = 0; /* duration in seconds */ static int64 end_time = 0; /* when to stop in micro seconds, under -T */ @@ -181,6 +199,19 @@ static int64 end_time = 0; /* when to stop in micro seconds, under -T */ */ static int scale = 1; +/* + * mode of data generation to use + */ +static char data_generation_type = INIT_STEP_GEN_TYPE_COPY_TEXT; + +/* + * COPY FROM BINARY execution buffer + */ +#define BIN_COPY_BUF_SIZE 102400 /* maximum buffer size for COPY FROM + * BINARY */ +static char *bin_copy_buffer = NULL; /* buffer for COPY FROM BINARY */ +static int32_t bin_copy_buffer_length = 0; /* current buffer size */ + /* * fillfactor. for example, fillfactor = 90 will use only 90 percent * space during inserts and leave 10 percent free. @@ -456,6 +487,9 @@ typedef struct StatsData */ static pg_time_usec_t epoch_shift; +/* used to track elapsed time and estimate of the remaining time of data load */ +static pg_time_usec_t data_load_start; + /* * Error status for errors during script execution. */ @@ -851,6 +885,7 @@ static bool socket_has_input(socket_set *sa, int fd, int idx); /* callback used to build rows for COPY during data loading */ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); +typedef void (*initRowMethodBinary) (PGconn *con, int64_t curr, int32_t parent); /* callback functions for our flex lexer */ static const PsqlScanCallbacks pgbench_callbacks = { @@ -913,8 +948,14 @@ usage(void) " run selected initialization steps, in the specified order\n" " d: drop any existing pgbench tables\n" " t: create the tables used by the standard pgbench scenario\n" - " g: generate data, client-side\n" - " G: generate data, server-side\n" + " to generate data, client-side:\n" + " g: COPY .. FROM STDIN .. TEXT\n" + " c: COPY .. FROM STDIN .. BINARY\n" + " to generate data, server-side:\n" + " G: INSERT .. SELECT generate_series\n" + " U: INSERT .. SELECT unnest\n" + " S: flag to use single transaction to initialize data\n" + " M: flag to use multiple transactions to initialize data\n" " v: invoke VACUUM on the standard tables\n" " p: create primary key indexes on the standard tables\n" " f: create foreign keys between the standard tables\n" @@ -4916,8 +4957,8 @@ initCreateTables(PGconn *con) static const struct ddlinfo DDLs[] = { { "pgbench_history", - "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)", - "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)", + "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22) default ''", + "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22) default ''", 0 }, { @@ -4928,8 +4969,8 @@ initCreateTables(PGconn *con) }, { "pgbench_accounts", - "aid int not null,bid int,abalance int,filler char(84)", - "aid bigint not null,bid int,abalance int,filler char(84)", + "aid int not null,bid int,abalance int,filler char(84) default ''", + "aid bigint not null,bid int,abalance int,filler char(84) default ''", 1 }, { @@ -5025,31 +5066,89 @@ initAccount(PQExpBufferData *sql, int64 curr) } static void -initPopulateTable(PGconn *con, const char *table, int64 base, - initRowMethod init_row) +showPopulateTableCopyProgress(const char *table, int64 current, int64 total) +{ + static int chars = 0; + static int prev_chars = 0; + static int log_interval = 1; + + /* Stay on the same line if reporting to a terminal */ + char eol = isatty(fileno(stderr)) ? '\r' : '\n'; + + double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - data_load_start); + double remaining_sec = ((double) total - current) * elapsed_sec / current; + + /* + * If we want to stick with the original logging, print a message each + * 100k inserted rows. + */ + if ((!use_quiet) && (current % 100000 == 0)) + { + chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", + current, total, + (int) ((current * 100) / total), + table, elapsed_sec, remaining_sec); + + /* + * If the previous progress message is longer than the current one, + * add spaces to the current line to fully overwrite any remaining + * characters from the previous message. + */ + if (prev_chars > chars) + fprintf(stderr, "%*c", prev_chars - chars, ' '); + fputc(eol, stderr); + prev_chars = chars; + } + /* let's not call the timing for each row, but only each 100 rows */ + else if (use_quiet && (current % 100 == 0)) + { + /* have we reached the next interval (or end)? */ + if ((current == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) + { + chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", + current, total, + (int) ((current * 100) / total), + table, elapsed_sec, remaining_sec); + + /* + * If the previous progress message is longer than the current + * one, add spaces to the current line to fully overwrite any + * remaining characters from the previous message. + */ + if (prev_chars > chars) + fprintf(stderr, "%*c", prev_chars - chars, ' '); + fputc(eol, stderr); + prev_chars = chars; + + /* skip to the next interval */ + log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); + } + } + + if (current + 1 == total && chars != 0) + { + fprintf(stderr, "%*c", chars, ' '); /* Clear the current line */ + fputc(eol, stderr); + } +} + +static void +initPopulateTableCopyText(PGconn *con, const char *table, int counter, int64 base, + initRowMethod init_row) { int n; - int64 k; - int chars = 0; - int prev_chars = 0; PGresult *res; PQExpBufferData sql; char copy_statement[256]; const char *copy_statement_fmt = "copy %s from stdin"; - int64 total = base * scale; - - /* used to track elapsed time and estimate of the remaining time */ - pg_time_usec_t start; - int log_interval = 1; - - /* Stay on the same line if reporting to a terminal */ - char eol = isatty(fileno(stderr)) ? '\r' : '\n'; + int64 start = base * counter; initPQExpBuffer(&sql); /* Use COPY with FREEZE on v14 and later for all ordinary tables */ if ((PQserverVersion(con) >= 140000) && - get_table_relkind(con, table) == RELKIND_RELATION) + get_table_relkind(con, table) == RELKIND_RELATION && + !multi_xact) copy_statement_fmt = "copy %s from stdin with (freeze on)"; @@ -5065,75 +5164,18 @@ initPopulateTable(PGconn *con, const char *table, int64 base, pg_fatal("unexpected copy in result: %s", PQerrorMessage(con)); PQclear(res); - start = pg_time_now(); - - for (k = 0; k < total; k++) + for (int64_t i = start; i < start + base; i++) { - int64 j = k + 1; - - init_row(&sql, k); + init_row(&sql, i); if (PQputline(con, sql.data)) pg_fatal("PQputline failed"); if (CancelRequested) break; - /* - * If we want to stick with the original logging, print a message each - * 100k inserted rows. - */ - if ((!use_quiet) && (j % 100000 == 0)) - { - double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); - double remaining_sec = ((double) total - j) * elapsed_sec / j; - - chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", - j, total, - (int) ((j * 100) / total), - table, elapsed_sec, remaining_sec); - - /* - * If the previous progress message is longer than the current - * one, add spaces to the current line to fully overwrite any - * remaining characters from the previous message. - */ - if (prev_chars > chars) - fprintf(stderr, "%*c", prev_chars - chars, ' '); - fputc(eol, stderr); - prev_chars = chars; - } - /* let's not call the timing for each row, but only each 100 rows */ - else if (use_quiet && (j % 100 == 0)) - { - double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); - double remaining_sec = ((double) total - j) * elapsed_sec / j; - - /* have we reached the next interval (or end)? */ - if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) - { - chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", - j, total, - (int) ((j * 100) / total), - table, elapsed_sec, remaining_sec); - - /* - * If the previous progress message is longer than the current - * one, add spaces to the current line to fully overwrite any - * remaining characters from the previous message. - */ - if (prev_chars > chars) - fprintf(stderr, "%*c", prev_chars - chars, ' '); - fputc(eol, stderr); - prev_chars = chars; - - /* skip to the next interval */ - log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); - } - } + showPopulateTableCopyProgress(table, i, base * scale); } - if (chars != 0 && eol != '\n') - fprintf(stderr, "%*c\r", chars, ' '); /* Clear the current line */ if (PQputline(con, "\\.\n")) pg_fatal("very last PQputline failed"); @@ -5150,9 +5192,9 @@ initPopulateTable(PGconn *con, const char *table, int64 base, * a blank-padded string in pgbench_accounts. */ static void -initGenerateDataClientSide(PGconn *con) +initGenerateDataClientSideTextFrmt(PGconn *con) { - fprintf(stderr, "generating data (client-side)...\n"); + fprintf(stderr, "TEXT mode...\n"); /* * we do all of this in one transaction to enable the backend's @@ -5163,64 +5205,549 @@ initGenerateDataClientSide(PGconn *con) /* truncate away any old data */ initTruncateTables(con); - /* - * fill branches, tellers, accounts in that order in case foreign keys - * already exist + if (multi_xact) + executeStatement(con, "commit"); + + for (int i = 0; i < scale; i++) + { + if (multi_xact) + executeStatement(con, "begin"); + + /* + * fill branches, tellers, accounts in that order in case foreign keys + * already exist + */ + initPopulateTableCopyText(con, "pgbench_branches", i, nbranches, initBranch); + initPopulateTableCopyText(con, "pgbench_tellers", i, ntellers, initTeller); + initPopulateTableCopyText(con, "pgbench_accounts", i, naccounts, initAccount); + + if (multi_xact) + executeStatement(con, "commit"); + } + + if (!multi_xact) + executeStatement(con, "commit"); +} + + +/* + * Save char data to buffer + * Kept as separate proc for possible addition of something + * like addCharColumn in future + */ +static void +bufferCharData(char *src, int32_t len) +{ + Assert(bin_copy_buffer_length + len <= BIN_COPY_BUF_SIZE); + + memcpy((char *) bin_copy_buffer + bin_copy_buffer_length, (char *) src, len); + bin_copy_buffer_length += len; +} + +/* + * Converts platform byte order into network byte order + * SPARC doesn't reqire that + */ +static void +bufferData(void *src, int32_t len) +{ + Assert(bin_copy_buffer_length + len <= BIN_COPY_BUF_SIZE); + +#ifdef __sparc__ + bufferCharData(src, len); +#else + + if (len == 1) + bufferCharData(src, len); + else + { + for (int32_t i = 0; i < len; i++) + { + ((char *) bin_copy_buffer + bin_copy_buffer_length)[i] = + ((char *) src)[len - i - 1]; + } + + bin_copy_buffer_length += len; + } +#endif +} + +/* + * adds column counter + */ +static void +addColumnCounter(int16_t n) +{ + bufferData((void *) &n, sizeof(n)); +} + +/* + * adds column with inti32 value + */ +static void +addInt32Column(int32_t value) +{ + int32_t data = value; + int32_t size = sizeof(data); + + bufferData((void *) &size, sizeof(size)); + bufferData((void *) &data, sizeof(data)); +} + +/* + * adds column with inti64 value + */ +static void +addInt64Column(int64_t value) +{ + int64_t data = value; + int32_t size = sizeof(data); + + bufferData((void *) &size, sizeof(size)); + bufferData((void *) &data, sizeof(data)); +} + +/* + * Starts communication with server for COPY FROM BINARY statement + */ +static void +sendBinaryCopyHeader(PGconn *con) +{ + static char header[] = {'P', 'G', 'C', 'O', 'P', 'Y', '\n', '\377', '\r', '\n', '\0', + '\0', '\0', '\0', '\0', + '\0', '\0', '\0', '\0'}; + + PQputCopyData(con, header, 19); +} + +/* + * Finishes communication with server for COPY FROM BINARY statement + */ +static void +sendBinaryCopyTrailer(PGconn *con) +{ + static char trailer[] = {0xFF, 0xFF}; + + PQputCopyData(con, trailer, 2); +} + +/* + * Flashes current buffer over network if needed + */ +static void +flushBuffer(PGconn *con, int16_t row_len) +{ + PGresult *res; + + if (bin_copy_buffer_length + row_len > BIN_COPY_BUF_SIZE) + { + res = PQgetResult(con); + + Assert(bin_copy_buffer_length <= BIN_COPY_BUF_SIZE); + + /* flush current buffer */ + if (PQresultStatus(res) == PGRES_COPY_IN) + PQputCopyData(con, (char *) bin_copy_buffer, bin_copy_buffer_length); + else + pg_fatal("It is NOT a COPY command that is currently running"); + + PQclear(res); + bin_copy_buffer_length = 0; + } +} + +/* + * Sends current branch row to buffer + */ +static void +initBranchBinary(PGconn *con, int64_t curr, int32_t parent) +{ + /*--- + * Check documentation about COPY command: + * https://www.postgresql.org/docs/current/sql-copy.html + * + * Each row of branches table is sent as: + * - 2 bytes for number of columns in tuple or sizeof(int16_t) + * - then 4 bytes or sizeof(int32_t) in front of each field with length of the field + * + * - branches table has following columns: + * - 4 bytes for bid column or sizeof(int32_t) + * - 4 bytes for bbalance column or sizeof(int32_t) + * - 88 bytes for filler column (optional since no requirement for row length) + *--- */ - initPopulateTable(con, "pgbench_branches", nbranches, initBranch); - initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); - initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); + /* following is our max intent at the moment */ + int16_t max_row_len = 2 + (4 + 4) + (4 + 4) + (4 + 88); - executeStatement(con, "commit"); + flushBuffer(con, max_row_len); + + addColumnCounter(2); + + addInt32Column(curr + 1); + addInt32Column(0); + /* we don't send filler column here to minimize network traffic and WAL */ } /* - * Fill the standard tables with some data generated on the server - * - * As already the case with the client-side data generation, the filler - * column defaults to NULL in pgbench_branches and pgbench_tellers, - * and is a blank-padded string in pgbench_accounts. + * Sends current teller row to buffer */ static void -initGenerateDataServerSide(PGconn *con) +initTellerBinary(PGconn *con, int64_t curr, int32_t parent) { - PQExpBufferData sql; + /*--- + * Check documentation about COPY command: + * https://www.postgresql.org/docs/current/sql-copy.html + * + * Each row of tellers table is sent as: + * - 2 bytes for number of columns in tuple or sizeof(int16_t) + * - then 4 bytes or sizeof(int32_t) in front of each field with length of the field + * + * - tellers table has following columns: + * - 4 bytes for tid column or sizeof(int32_t) + * - 4 bytes for bid column or sizeof(int32_t) + * - 4 bytes for tbalance column or sizeof(int32_t) + * - 84 bytes for filler column (optional since no requirement for row length) + *--- + */ + /* following is our max intent at the moment */ + int16_t max_row_len = 2 + (4 + 4) + (4 + 4) + (4 + 4) + (4 + 84); + + flushBuffer(con, max_row_len); + + addColumnCounter(3); + + addInt32Column(curr + 1); + addInt32Column(curr / parent + 1); + addInt32Column(0); + /* we don't send filler column here to minimize network traffic and WAL */ +} + +/* + * Sends current account row to buffer + */ +static void +initAccountBinary(PGconn *con, int64_t curr, int32_t parent) +{ + /*--- + * Check documentation about COPY command: + * https://www.postgresql.org/docs/current/sql-copy.html + * + * Each row of accounts table is sent as: + * - 2 bytes for number of columns in tuple or sizeof(int16_t) + * - then 4 bytes or sizeof(int32_t) in front of each field with length of the field + * + * - accounts table has following columns (taking into account scale > 20000): + * - 8 bytes for aid column or sizeof(int64_t) + * - 4 bytes for bid column or sizeof(int32_t) + * - 4 bytes for abalance column or sizeof(int32_t) + * - 84 bytes for filler column (optional since no requirement for row length) + *--- + */ + /* following is our max intent at the moment */ + int16_t max_row_len = 2 + (4 + 8) + (4 + 4) + (4 + 4) + (4 + 84); + + flushBuffer(con, max_row_len); + + addColumnCounter(3); + + if (scale <= SCALE_32BIT_THRESHOLD) + addInt32Column(curr + 1); + else + addInt64Column(curr); + + addInt32Column(curr / parent + 1); + addInt32Column(0); + /* we don't send filler column here to minimize network traffic and WAL */ +} + +/* + * Universal wrapper for sending data in binary format + */ +static void +initPopulateTableCopyBinary(PGconn *con, char *table, char *columns, + int counter, int64_t base, initRowMethodBinary init_row) +{ + int n; + PGresult *res; + char copy_statement[256]; + const char *copy_statement_fmt = "copy %s (%s) from stdin (format binary)"; + int64_t start = base * counter; + + bin_copy_buffer_length = 0; + + /* Use COPY with FREEZE on v14 and later for all ordinary tables */ + if ((PQserverVersion(con) >= 140000) && + get_table_relkind(con, table) == RELKIND_RELATION && + !multi_xact) + copy_statement_fmt = "copy %s (%s) from stdin with (format binary, freeze on)"; + + n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table, columns); + if (n >= sizeof(copy_statement)) + pg_fatal("invalid buffer size: must be at least %d characters long", n); + else if (n == -1) + pg_fatal("invalid format string"); + + res = PQexec(con, copy_statement); + + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("unexpected copy in result: %s", PQerrorMessage(con)); + PQclear(res); + + + sendBinaryCopyHeader(con); + + + for (int64_t i = start; i < start + base; i++) + { + init_row(con, i, base); + + if (CancelRequested) + break; + + showPopulateTableCopyProgress(table, i, base * scale); + } + + res = PQgetResult(con); + + Assert(bin_copy_buffer_length <= BIN_COPY_BUF_SIZE); + + if (PQresultStatus(res) == PGRES_COPY_IN) + PQputCopyData(con, (char *) bin_copy_buffer, bin_copy_buffer_length); + else + fprintf(stderr, "Unexpected mode %d instead of %d\n", PQresultStatus(res), PGRES_COPY_IN); + PQclear(res); + + + sendBinaryCopyTrailer(con); + + + res = PQgetResult(con); + if (PQresultStatus(res) == PGRES_COPY_IN) + { + if (PQputCopyEnd(con, NULL) == 1) /* success */ + { + PQclear(res); + res = PQgetResult(con); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + fprintf(stderr, "Error: %s\n", PQerrorMessage(con)); + } + else + fprintf(stderr, "Error: %s\n", PQerrorMessage(con)); + } + PQclear(res); +} + +/* + * Wrapper for binary data load + */ +static void +initGenerateDataClientSideBinaryFrmt(PGconn *con) +{ + + fprintf(stderr, "BINARY mode...\n"); - fprintf(stderr, "generating data (server-side)...\n"); + bin_copy_buffer = pg_malloc(BIN_COPY_BUF_SIZE); + bin_copy_buffer_length = 0; /* - * we do all of this in one transaction to enable the backend's - * data-loading optimizations + * we do all of this in multiple transactions to minimize load on DB + * server and perhaps in future allow load in parallel sessions */ executeStatement(con, "begin"); /* truncate away any old data */ initTruncateTables(con); + if (multi_xact) + executeStatement(con, "commit"); + + for (int i = 0; i < scale; i++) + { + if (multi_xact) + executeStatement(con, "begin"); + + initPopulateTableCopyBinary(con, "pgbench_branches", "bid, bbalance", + i, nbranches, initBranchBinary); + initPopulateTableCopyBinary(con, "pgbench_tellers", "tid, bid, tbalance", + i, ntellers, initTellerBinary); + initPopulateTableCopyBinary(con, "pgbench_accounts", "aid, bid, abalance", + i, naccounts, initAccountBinary); + + if (multi_xact) + executeStatement(con, "commit"); + } + + if (!multi_xact) + executeStatement(con, "commit"); + + pg_free(bin_copy_buffer); +} + +/* + * Fill the standard tables with some data generated and sent from the client. + */ +static void +initGenerateDataClientSide(PGconn *con) +{ + fprintf(stderr, "generating data (client-side as %s transaction%s) in ", + multi_xact ? "multiple" : "single", multi_xact ? "s" : ""); + + data_load_start = pg_time_now(); + + switch (data_generation_type) + { + case INIT_STEP_GEN_TYPE_COPY_TEXT: + initGenerateDataClientSideTextFrmt(con); + break; + case INIT_STEP_GEN_TYPE_COPY_BINARY: + initGenerateDataClientSideBinaryFrmt(con); + break; + } +} + +/* + * Generating data via INSERT .. SELECT .. FROM generate_series + * Possibly as "One transaction per scale" in multi-transaction mode + */ +static void +generateDataInsertSeries(PGconn *con) +{ + PQExpBufferData sql; + + fprintf(stderr, "via INSERT .. SELECT generate_series... in multiple TXN(s)\n"); + initPQExpBuffer(&sql); - printfPQExpBuffer(&sql, - "insert into pgbench_branches(bid,bbalance) " - "select bid, 0 " - "from generate_series(1, %d) as bid", nbranches * scale); - executeStatement(con, sql.data); - - printfPQExpBuffer(&sql, - "insert into pgbench_tellers(tid,bid,tbalance) " - "select tid, (tid - 1) / %d + 1, 0 " - "from generate_series(1, %d) as tid", ntellers, ntellers * scale); - executeStatement(con, sql.data); - - printfPQExpBuffer(&sql, - "insert into pgbench_accounts(aid,bid,abalance,filler) " - "select aid, (aid - 1) / %d + 1, 0, '' " - "from generate_series(1, " INT64_FORMAT ") as aid", - naccounts, (int64) naccounts * scale); - executeStatement(con, sql.data); + executeStatement(con, "begin"); + + /* truncate away any old data */ + initTruncateTables(con); + + if (multi_xact) + executeStatement(con, "commit"); + + for (int i = 0; i < scale; i++) + { + if (multi_xact) + executeStatement(con, "begin"); + + printfPQExpBuffer(&sql, + "insert into pgbench_branches(bid, bbalance) " + "values(%d, 0)", i + 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_tellers(tid, bid, tbalance) " + "select tid + 1, tid / %d + 1, 0 " + "from generate_series(%d, %d) as tid", + ntellers, i * ntellers, (i + 1) * ntellers - 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_accounts(aid, bid, abalance, " + "filler) " + "select aid + 1, aid / %d + 1, 0, '' " + "from generate_series(" INT64_FORMAT ", " + INT64_FORMAT ") as aid", + naccounts, (int64) i * naccounts, + (int64) (i + 1) * naccounts - 1); + executeStatement(con, sql.data); + + if (multi_xact) + executeStatement(con, "commit"); + } + + if (!multi_xact) + executeStatement(con, "commit"); termPQExpBuffer(&sql); +} + +/* + * Generating data via INSERT .. SELECT .. FROM unnest + * Possibly as "One transaction per scale" in multi-tansaction mode + */ +static void +generateDataInsertUnnest(PGconn *con) +{ + PQExpBufferData sql; - executeStatement(con, "commit"); + fprintf(stderr, "via INSERT .. SELECT unnest...\n"); + + initPQExpBuffer(&sql); + + executeStatement(con, "begin"); + + /* truncate away any old data */ + initTruncateTables(con); + + if (multi_xact) + executeStatement(con, "commit"); + + for (int s = 0; s < scale; s++) + { + if (multi_xact) + executeStatement(con, "begin"); + + printfPQExpBuffer(&sql, + "insert into pgbench_branches(bid,bbalance) " + "values(%d, 0)", s + 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_tellers(tid, bid, tbalance) " + "select unnest(array_agg(s.i order by s.i)) as tid, " + "%d as bid, 0 as tbalance " + "from generate_series(%d, %d) as s(i)", + s + 1, s * ntellers + 1, (s + 1) * ntellers); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "with data as (" + " select generate_series(" INT64_FORMAT ", " + INT64_FORMAT ") as i) " + "insert into pgbench_accounts(aid, bid, " + "abalance, filler) " + "select unnest(aid), unnest(bid), 0 as abalance, " + "'' as filler " + "from (select array_agg(i+1) aid, " + "array_agg(i/%d + 1) bid from data)", + (int64) s * naccounts + 1, + (int64) (s + 1) * naccounts, naccounts); + executeStatement(con, sql.data); + + if (multi_xact) + executeStatement(con, "commit"); + } + + if (!multi_xact) + executeStatement(con, "commit"); + + termPQExpBuffer(&sql); +} + +/* + * Fill the standard tables with some data generated on the server side + * + * As already the case with the client-side data generation, the filler + * column defaults to NULL in pgbench_branches and pgbench_tellers, + * and is a blank-padded string in pgbench_accounts. + */ +static void +initGenerateDataServerSide(PGconn *con) +{ + fprintf(stderr, "generating data (server-side as %s transaction%s) ", + multi_xact ? "multiple" : "single", multi_xact ? "s" : ""); + + switch (data_generation_type) + { + case INIT_STEP_GEN_TYPE_INSERT_SERIES: + generateDataInsertSeries(con); + break; + case INIT_STEP_GEN_TYPE_INSERT_UNNEST: + generateDataInsertUnnest(con); + break; + } } /* @@ -5306,6 +5833,8 @@ initCreateFKeys(PGconn *con) static void checkInitSteps(const char *initialize_steps) { + char data_init_type = 0; + if (initialize_steps[0] == '\0') pg_fatal("no initialization steps specified"); @@ -5317,7 +5846,22 @@ checkInitSteps(const char *initialize_steps) pg_log_error_detail("Allowed step characters are: \"" ALL_INIT_STEPS "\"."); exit(1); } + + switch (*step) + { + case INIT_STEP_GEN_TYPE_COPY_TEXT: + case INIT_STEP_GEN_TYPE_COPY_BINARY: + case INIT_STEP_GEN_TYPE_INSERT_SERIES: + case INIT_STEP_GEN_TYPE_INSERT_UNNEST: + data_init_type++; + break; + } } + + if (data_init_type == 0) + pg_log_warning("No data generation type is provided"); + if (data_init_type > 1) + pg_log_warning("More than one type of data initialization is requested"); } /* @@ -5355,14 +5899,24 @@ runInitSteps(const char *initialize_steps) op = "create tables"; initCreateTables(con); break; - case 'g': + case INIT_STEP_GEN_TYPE_COPY_TEXT: + case INIT_STEP_GEN_TYPE_COPY_BINARY: op = "client-side generate"; + data_generation_type = *step; initGenerateDataClientSide(con); break; - case 'G': + case INIT_STEP_GEN_TYPE_INSERT_SERIES: + case INIT_STEP_GEN_TYPE_INSERT_UNNEST: op = "server-side generate"; + data_generation_type = *step; initGenerateDataServerSide(con); break; + case INIT_STEP_GEN_TYPE_SINGLE_XACT: + multi_xact = false; + break; + case INIT_STEP_GEN_TYPE_MULTI_XACT: + multi_xact = true; + break; case 'v': op = "vacuum"; initVacuum(con); @@ -6940,7 +7494,6 @@ main(int argc, char **argv) case 'I': pg_free(initialize_steps); initialize_steps = pg_strdup(optarg); - checkInitSteps(initialize_steps); initialization_option_set = true; break; case 'j': /* jobs */ @@ -7245,6 +7798,7 @@ main(int argc, char **argv) } } + checkInitSteps(initialize_steps); runInitSteps(initialize_steps); exit(0); } diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl index b7685ea5d20..6c7783a77f7 100644 --- a/src/bin/pgbench/t/001_pgbench_with_server.pl +++ b/src/bin/pgbench/t/001_pgbench_with_server.pl @@ -16,25 +16,30 @@ sub check_data_state local $Test::Builder::Level = $Test::Builder::Level + 1; my $node = shift; my $type = shift; + my $sql_result; - my $sql_result = $node->safe_psql('postgres', - 'SELECT count(*) AS null_count FROM pgbench_accounts WHERE filler IS NULL LIMIT 10;' - ); - is($sql_result, '0', - "$type: filler column of pgbench_accounts has no NULL data"); $sql_result = $node->safe_psql('postgres', 'SELECT count(*) AS null_count FROM pgbench_branches WHERE filler IS NULL;' ); is($sql_result, '1', "$type: filler column of pgbench_branches has only NULL data"); + $sql_result = $node->safe_psql('postgres', 'SELECT count(*) AS null_count FROM pgbench_tellers WHERE filler IS NULL;' ); is($sql_result, '10', "$type: filler column of pgbench_tellers has only NULL data"); + + $sql_result = $node->safe_psql('postgres', + 'SELECT count(*) AS null_count FROM pgbench_accounts WHERE filler IS NULL LIMIT 10;' + ); + is($sql_result, '0', + "$type: filler column of pgbench_accounts has no NULL data"); + $sql_result = $node->safe_psql('postgres', 'SELECT count(*) AS data_count FROM pgbench_history;'); - is($sql_result, '0', "$type: pgbench_history has no data"); + is($sql_result, '0', + "$type: pgbench_history has no data"); } # start a pgbench specific server @@ -112,6 +117,7 @@ $node->pgbench( [qr{Perhaps you need to do initialization}], 'run without init'); + # Initialize pgbench tables scale 1 $node->pgbench( '-i', 0, @@ -125,7 +131,7 @@ $node->pgbench( 'pgbench scale 1 initialization',); # Check data state, after client-side data generation. -check_data_state($node, 'client-side'); +check_data_state($node, 'client-side (default options)'); # Again, with all possible options $node->pgbench( @@ -143,6 +149,7 @@ $node->pgbench( qr{done in \d+\.\d\d s } ], 'pgbench scale 1 initialization'); +check_data_state($node, 'client-side (all options)'); # Test interaction of --init-steps with legacy step-selection options $node->pgbench( @@ -154,7 +161,7 @@ $node->pgbench( qr{creating tables}, qr{creating 3 partitions}, qr{creating primary keys}, - qr{generating data \(server-side\)}, + qr{generating data \(server-side as single transaction\)}, qr{creating foreign keys}, qr{(?!vacuuming)}, # no vacuum qr{done in \d+\.\d\d s } @@ -164,6 +171,219 @@ $node->pgbench( # Check data state, after server-side data generation. check_data_state($node, 'server-side'); + +# Test server-side generation with generate_series +$node->pgbench( + '--initialize --init-steps=dtG', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as single transaction\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side generate_series'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (generate_series)'); + +$node->pgbench( + '--initialize --init-steps=dtSG', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as single transaction\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side generate_series'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (generate_series single XACT)'); + +$node->pgbench( + '--initialize --init-steps=dtMG', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as multiple transactions\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side generate_series'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (generate_series multiple XACTs)'); + + +# Test server-side generation with UNNEST +$node->pgbench( + '--initialize --init-steps=dtU', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as single transaction\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side UNNEST'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (unnest)'); + +$node->pgbench( + '--initialize --init-steps=dtSU', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as single transaction\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side UNNEST'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (unnest)'); + +$node->pgbench( + '--initialize --init-steps=dtMU', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(server-side as multiple transactions\)}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps server-side UNNEST'); + +# Check data state, after server-side data generation. +check_data_state($node, 'server-side (unnest)'); + + +# Test client-side generation with COPY TEXT +$node->pgbench( + '--initialize --init-steps=dtg', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as single transaction}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side TEXT (single XACT #1)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (text)'); + +$node->pgbench( + '--initialize --init-steps=dtSg', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as single transaction}, + qr{\d of \d+ tuples \(\d%\) of pgbench_branches done}, + qr{\d of \d+ tuples \(\d%\) of pgbench_tellers done}, + qr{\d of \d+ tuples \(\d%\) of pgbench_accounts done}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side TEXT (single XACT #2)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (text)'); + +$node->pgbench( + '--initialize --init-steps=dtMg', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as multiple transactions}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side TEXT (multiple XACTs)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (text)'); + + +# Test client-side generation with COPY BINARY +$node->pgbench( + '--initialize --init-steps=dtc', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as single transaction}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side BINARY (single XACT #1)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (binary)'); + +$node->pgbench( + '--initialize --init-steps=dtSc', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as single transaction}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side BINARY (single XACT #2)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (binary)'); + +$node->pgbench( + '--initialize --init-steps=dtMc', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as multiple transactions}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side BINARY'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (binary)'); + + +# Check data state, after different modes of client-side data generation. +check_data_state($node, 'client-side (binary)'); + +$node->pgbench( + '--initialize --init-steps=dtMccSc', + 0, + [qr{^$}], + [ + qr{dropping old tables}, + qr{creating tables}, + qr{generating data \(client-side as multiple transactions}, + qr{generating data \(client-side as multiple transactions}, + qr{generating data \(client-side as single transaction}, + qr{done in \d+\.\d\d s } + ], + 'pgbench --init-steps client-side BINARY (multiple XACT modes)'); + +# Check data state, after client-side data generation. +check_data_state($node, 'client-side (binary different XACT modes in list of --init-steps)'); + + # Run all builtin scripts, for a few transactions each $node->pgbench( '--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t' -- 2.43.0