diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 1515ed405ba..6b89007a63b 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -161,7 +161,7 @@ typedef struct socket_set * some configurable parameters */ #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */ -#define ALL_INIT_STEPS "dtgGvpf" /* all possible steps */ +#define ALL_INIT_STEPS "dtgCGiIvpf" /* all possible steps */ #define LOG_STEP_SECONDS 5 /* seconds between log messages */ #define DEFAULT_NXACTS 10 /* default nxacts */ @@ -171,6 +171,14 @@ typedef struct socket_set #define MIN_ZIPFIAN_PARAM 1.001 /* minimum parameter for zipfian */ #define MAX_ZIPFIAN_PARAM 1000.0 /* maximum parameter for zipfian */ +/* original single transaction server-side method */ +#define GEN_TYPE_INSERT_ORIGINAL 'G' /* use INSERT .. SELECT generate_series to generate data */ +/* 'one transaction per scale' server-side methods */ +#define GEN_TYPE_INSERT_SERIES 'i' /* use INSERT .. SELECT generate_series to generate data */ +#define GEN_TYPE_INSERT_UNNEST 'I' /* use INSERT .. SELECT unnest to generate data */ +#define GEN_TYPE_COPY_ORIGINAL 'g' /* use COPY .. FROM STDIN .. TEXT to generate data */ +#define GEN_TYPE_COPY_BINARY 'C' /* use COPY .. FROM STDIN .. BINARY to generate data */ + static int nxacts = 0; /* number of transactions per client */ static int duration = 0; /* duration in seconds */ static int64 end_time = 0; /* when to stop in micro seconds, under -T */ @@ -181,6 +189,18 @@ static int64 end_time = 0; /* when to stop in micro seconds, under -T */ */ static int scale = 1; +/* + * mode of data generation to use + */ +static char data_generation_type = '?'; + +/* + * COPY FROM BINARY execution buffer + */ +#define BIN_COPY_BUF_SIZE 102400 /* maximum buffer size for COPY FROM BINARY */ +static char *bin_copy_buffer = NULL; /* buffer for COPY FROM BINARY */ +static int32_t bin_copy_buffer_length = 0; /* current buffer size */ + /* * fillfactor. for example, fillfactor = 90 will use only 90 percent * space during inserts and leave 10 percent free. @@ -402,14 +422,15 @@ typedef struct StatsData * directly successful transactions (they were successfully completed on * the first try). * - * A failed transaction is defined as unsuccessfully retried transactions. - * It can be one of two types: - * - * failed (the number of failed transactions) = + * 'failed' (the number of failed transactions) = * 'serialization_failures' (they got a serialization error and were not - * successfully retried) + + * successfully retried) + * 'deadlock_failures' (they got a deadlock error and were not - * successfully retried). + * successfully retried) + + * 'other_sql_failures' (they failed on the first try or after retries + * due to a SQL error other than serialization or + * deadlock; they are counted as a failed transaction + * only when --continue-on-error is specified). * * If the transaction was retried after a serialization or a deadlock * error this does not guarantee that this retry was successful. Thus @@ -421,7 +442,7 @@ typedef struct StatsData * * 'retried' (number of all retried transactions) = * successfully retried transactions + - * failed transactions. + * unsuccessful retried transactions. *---------- */ int64 cnt; /* number of successful transactions, not @@ -440,6 +461,11 @@ typedef struct StatsData int64 deadlock_failures; /* number of transactions that were not * successfully retried after a deadlock * error */ + int64 other_sql_failures; /* number of failed transactions for + * reasons other than + * serialization/deadlock failure, which + * is counted if --continue-on-error is + * specified */ SimpleStats latency; SimpleStats lag; } StatsData; @@ -457,6 +483,7 @@ typedef enum EStatus { ESTATUS_NO_ERROR = 0, ESTATUS_META_COMMAND_ERROR, + ESTATUS_CONN_ERROR, /* SQL errors */ ESTATUS_SERIALIZATION_ERROR, @@ -770,6 +797,7 @@ static int64 total_weight = 0; static bool verbose_errors = false; /* print verbose messages of all errors */ static bool exit_on_abort = false; /* exit when any client is aborted */ +static bool continue_on_error = false; /* continue after errors */ /* Builtin test scripts */ typedef struct BuiltinScript @@ -842,7 +870,8 @@ static int wait_on_socket_set(socket_set *sa, int64 usecs); static bool socket_has_input(socket_set *sa, int fd, int idx); /* callback used to build rows for COPY during data loading */ -typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); +typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); +typedef void (*initRowMethodBin) (PGconn *con, PGresult *res, int64_t curr, int32_t parent); /* callback functions for our flex lexer */ static const PsqlScanCallbacks pgbench_callbacks = { @@ -906,7 +935,10 @@ usage(void) " d: drop any existing pgbench tables\n" " t: create the tables used by the standard pgbench scenario\n" " g: generate data, client-side\n" - " G: generate data, server-side\n" + " C: client-side (single TNX) COPY .. FROM STDIN .. BINARY\n" + " G: generate data, server-side in single transaction\n" + " i: server-side (multiple TXNs) INSERT .. SELECT generate_series\n" + " I: server-side (multiple TXNs) INSERT .. SELECT unnest\n" " v: invoke VACUUM on the standard tables\n" " p: create primary key indexes on the standard tables\n" " f: create foreign keys between the standard tables\n" @@ -949,6 +981,7 @@ usage(void) " -T, --time=NUM duration of benchmark test in seconds\n" " -v, --vacuum-all vacuum all four standard tables before tests\n" " --aggregate-interval=NUM aggregate data over NUM seconds\n" + " --continue-on-error continue running after an SQL error\n" " --exit-on-abort exit when any client is aborted\n" " --failures-detailed report the failures grouped by basic types\n" " --log-prefix=PREFIX prefix for transaction time log file\n" @@ -1467,6 +1500,7 @@ initStats(StatsData *sd, pg_time_usec_t start) sd->retried = 0; sd->serialization_failures = 0; sd->deadlock_failures = 0; + sd->other_sql_failures = 0; initSimpleStats(&sd->latency); initSimpleStats(&sd->lag); } @@ -1516,6 +1550,9 @@ accumStats(StatsData *stats, bool skipped, double lat, double lag, case ESTATUS_DEADLOCK_ERROR: stats->deadlock_failures++; break; + case ESTATUS_OTHER_SQL_ERROR: + stats->other_sql_failures++; + break; default: /* internal error which should never occur */ pg_fatal("unexpected error status: %d", estatus); @@ -3231,11 +3268,43 @@ sendCommand(CState *st, Command *command) } /* - * Get the error status from the error code. + * Read and discard all available results from the connection. + */ +static void +discardAvailableResults(CState *st) +{ + PGresult *res = NULL; + + for (;;) + { + res = PQgetResult(st->con); + + /* + * Read and discard results until PQgetResult() returns NULL (no more + * results) or a connection failure is detected. If the pipeline + * status is PQ_PIPELINE_ABORTED, more results may still be available + * even after PQgetResult() returns NULL, so continue reading in that + * case. + */ + if ((res == NULL && PQpipelineStatus(st->con) != PQ_PIPELINE_ABORTED) || + PQstatus(st->con) == CONNECTION_BAD) + break; + + PQclear(res); + } + PQclear(res); +} + +/* + * Determine the error status based on the connection status and error code. */ static EStatus -getSQLErrorStatus(const char *sqlState) +getSQLErrorStatus(CState *st, const char *sqlState) { + discardAvailableResults(st); + if (PQstatus(st->con) == CONNECTION_BAD) + return ESTATUS_CONN_ERROR; + if (sqlState != NULL) { if (strcmp(sqlState, ERRCODE_T_R_SERIALIZATION_FAILURE) == 0) @@ -3257,6 +3326,17 @@ canRetryError(EStatus estatus) estatus == ESTATUS_DEADLOCK_ERROR); } +/* + * Returns true if --continue-on-error is specified and this error allows + * processing to continue. + */ +static bool +canContinueOnError(EStatus estatus) +{ + return (continue_on_error && + estatus == ESTATUS_OTHER_SQL_ERROR); +} + /* * Process query response from the backend. * @@ -3375,9 +3455,9 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) case PGRES_NONFATAL_ERROR: case PGRES_FATAL_ERROR: - st->estatus = getSQLErrorStatus(PQresultErrorField(res, - PG_DIAG_SQLSTATE)); - if (canRetryError(st->estatus)) + st->estatus = getSQLErrorStatus(st, PQresultErrorField(res, + PG_DIAG_SQLSTATE)); + if (canRetryError(st->estatus) || canContinueOnError(st->estatus)) { if (verbose_errors) commandError(st, PQresultErrorMessage(res)); @@ -3409,11 +3489,7 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix) error: PQclear(res); PQclear(next_res); - do - { - res = PQgetResult(st->con); - PQclear(res); - } while (res); + discardAvailableResults(st); return false; } @@ -3511,14 +3587,18 @@ doRetry(CState *st, pg_time_usec_t *now) } /* - * Read results and discard it until a sync point. + * Read and discard results until the last sync point. */ static int discardUntilSync(CState *st) { bool received_sync = false; - /* send a sync */ + /* + * Send a Sync message to ensure at least one PGRES_PIPELINE_SYNC is + * received and to avoid an infinite loop, since all earlier ones may have + * already been received. + */ if (!PQpipelineSync(st->con)) { pg_log_error("client %d aborted: failed to send a pipeline sync", @@ -3526,29 +3606,42 @@ discardUntilSync(CState *st) return 0; } - /* receive PGRES_PIPELINE_SYNC and null following it */ + /* + * Continue reading results until the last sync point, i.e., until + * reaching null just after PGRES_PIPELINE_SYNC. + */ for (;;) { PGresult *res = PQgetResult(st->con); + if (PQstatus(st->con) == CONNECTION_BAD) + { + pg_log_error("client %d aborted while rolling back the transaction after an error; perhaps the backend died while processing", + st->id); + PQclear(res); + return 0; + } + if (PQresultStatus(res) == PGRES_PIPELINE_SYNC) received_sync = true; - else if (received_sync) + else if (received_sync && res == NULL) { - /* - * PGRES_PIPELINE_SYNC must be followed by another - * PGRES_PIPELINE_SYNC or NULL; otherwise, assert failure. - */ - Assert(res == NULL); - /* * Reset ongoing sync count to 0 since all PGRES_PIPELINE_SYNC * results have been discarded. */ st->num_syncs = 0; - PQclear(res); break; } + else + { + /* + * If a PGRES_PIPELINE_SYNC is followed by something other than + * PGRES_PIPELINE_SYNC or NULL, another PGRES_PIPELINE_SYNC will + * appear later. Reset received_sync to false to wait for it. + */ + received_sync = false; + } PQclear(res); } @@ -4041,7 +4134,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON) st->state = CSTATE_END_COMMAND; } - else if (canRetryError(st->estatus)) + else if (canRetryError(st->estatus) || canContinueOnError(st->estatus)) st->state = CSTATE_ERROR; else st->state = CSTATE_ABORTED; @@ -4562,7 +4655,8 @@ static int64 getFailures(const StatsData *stats) { return (stats->serialization_failures + - stats->deadlock_failures); + stats->deadlock_failures + + stats->other_sql_failures); } /* @@ -4582,6 +4676,8 @@ getResultString(bool skipped, EStatus estatus) return "serialization"; case ESTATUS_DEADLOCK_ERROR: return "deadlock"; + case ESTATUS_OTHER_SQL_ERROR: + return "other"; default: /* internal error which should never occur */ pg_fatal("unexpected error status: %d", estatus); @@ -4637,6 +4733,7 @@ doLog(TState *thread, CState *st, int64 skipped = 0; int64 serialization_failures = 0; int64 deadlock_failures = 0; + int64 other_sql_failures = 0; int64 retried = 0; int64 retries = 0; @@ -4677,10 +4774,12 @@ doLog(TState *thread, CState *st, { serialization_failures = agg->serialization_failures; deadlock_failures = agg->deadlock_failures; + other_sql_failures = agg->other_sql_failures; } - fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT, + fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT " " INT64_FORMAT, serialization_failures, - deadlock_failures); + deadlock_failures, + other_sql_failures); fputc('\n', logfile); @@ -4886,26 +4985,26 @@ initCreateTables(PGconn *con) static const struct ddlinfo DDLs[] = { { "pgbench_history", - "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)", - "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)", + "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22) default ''", + "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22) default ''", 0 }, { "pgbench_tellers", - "tid int not null,bid int,tbalance int,filler char(84)", - "tid int not null,bid int,tbalance int,filler char(84)", + "tid int not null,bid int,tbalance int,filler char(84) default ''", + "tid int not null,bid int,tbalance int,filler char(84) default ''", 1 }, { "pgbench_accounts", - "aid int not null,bid int,abalance int,filler char(84)", - "aid bigint not null,bid int,abalance int,filler char(84)", + "aid int not null,bid int,abalance int,filler char(84) default ''", + "aid bigint not null,bid int,abalance int,filler char(84) default ''", 1 }, { "pgbench_branches", - "bid int not null,bbalance int,filler char(88)", - "bid int not null,bbalance int,filler char(88)", + "bid int not null,bbalance int,filler char(88) default ''", + "bid int not null,bbalance int,filler char(88) default ''", 1 } }; @@ -5120,9 +5219,9 @@ initPopulateTable(PGconn *con, const char *table, int64 base, * a blank-padded string in pgbench_accounts. */ static void -initGenerateDataClientSide(PGconn *con) +initGenerateDataClientSideText(PGconn *con) { - fprintf(stderr, "generating data (client-side)...\n"); + fprintf(stderr, "TEXT mode...\n"); /* * we do all of this in one transaction to enable the backend's @@ -5138,25 +5237,389 @@ initGenerateDataClientSide(PGconn *con) * already exist */ initPopulateTable(con, "pgbench_branches", nbranches, initBranch); - initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); + initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); executeStatement(con, "commit"); } + /* - * Fill the standard tables with some data generated on the server - * - * As already the case with the client-side data generation, the filler - * column defaults to NULL in pgbench_branches and pgbench_tellers, - * and is a blank-padded string in pgbench_accounts. + * Dumps binary buffer to file (purely for debugging) */ static void -initGenerateDataServerSide(PGconn *con) +dumpBufferToFile(char *filename) +{ + FILE *file_ptr; + size_t bytes_written; + + file_ptr = fopen(filename, "wb"); + if (file_ptr == NULL) + { + fprintf(stderr, "Error opening file %s\n", filename); + return; // EXIT_FAILURE; + } + + bytes_written = fwrite(bin_copy_buffer, 1, bin_copy_buffer_length, file_ptr); + + if (bytes_written != bin_copy_buffer_length) + { + fprintf(stderr, "Error writing to file or incomplete write\n"); + fclose(file_ptr); + return; // EXIT_FAILURE; + } + + fclose(file_ptr); +} + +/* + * Save char data to buffer + */ +static void +bufferCharData(char *src, int32_t len) +{ + memcpy((char *) bin_copy_buffer + bin_copy_buffer_length, (char *) src, len); + bin_copy_buffer_length += len; +} + +/* + * Converts platform byte order into network byte order + * SPARC doesn't reqire that + */ +static void +bufferData(void *src, int32_t len) +{ +#ifdef __sparc__ + bufferCharData(src, len); +#else + if (len == 1) + bufferCharData(src, len); + else + for (int32_t i = 0; i < len; i++) + { + ((char *) bin_copy_buffer + bin_copy_buffer_length)[i] = + ((char *) src)[len - i - 1]; + } + + bin_copy_buffer_length += len; +#endif +} + +/* + * adds column counter + */ +static void +addColumnCounter(int16_t n) +{ + bufferData((void *) &n, sizeof(n)); +} + +/* + * adds column with NULL value + */ +static void +addNullColumn() +{ + int32_t null = -1; + bufferData((void *) &null, sizeof(null)); +} + +/* + * adds column with int8 value + */ +static void +addInt8Column(int8_t value) +{ + int8_t data = value; + int32_t size = sizeof(data); + bufferData((void *) &size, sizeof(size)); + bufferData((void *) &data, sizeof(data)); +} + +/* + * adds column with int16 value + */ +static void +addInt16Column(int16_t value) +{ + int16_t data = value; + int32_t size = sizeof(data); + bufferData((void *) &size, sizeof(size)); + bufferData((void *) &data, sizeof(data)); +} + +/* + * adds column with inti32 value + */ +static void +addInt32Column(int32_t value) +{ + int32_t data = value; + int32_t size = sizeof(data); + bufferData((void *) &size, sizeof(size)); + bufferData((void *) &data, sizeof(data)); +} + +/* + * adds column with inti64 value + */ +static void +addInt64Column(int64_t value) +{ + int64_t data = value; + int32_t size = sizeof(data); + bufferData((void *) &size, sizeof(size)); + bufferData((void *) &data, sizeof(data)); +} + +/* + * adds column with char value + */ +static void +addCharColumn(char *value) +{ + int32_t size = strlen(value); + bufferData((void *) &size, sizeof(size)); + bufferCharData(value, size); +} + +/* + * Starts communication with server for COPY FROM BINARY statement + */ +static void +sendBinaryCopyHeader(PGconn *con) +{ + char header[] = {'P','G','C','O','P','Y','\n','\377','\r','\n','\0', + '\0','\0','\0','\0', + '\0','\0','\0','\0' }; + + PQputCopyData(con, header, 19); +} + +/* + * Finishes communication with server for COPY FROM BINARY statement + */ +static void +sendBinaryCopyTrailer(PGconn *con) +{ + static char trailer[] = { 0xFF, 0xFF }; + + PQputCopyData(con, trailer, 2); +} + +/* + * Flashes current buffer over network if needed + */ +static void +flushBuffer(PGconn *con, PGresult *res, int16_t row_len) +{ + if (bin_copy_buffer_length + row_len > BIN_COPY_BUF_SIZE) + { + /* flush current buffer */ + if (PQresultStatus(res) == PGRES_COPY_IN) + PQputCopyData(con, (char *) bin_copy_buffer, bin_copy_buffer_length); + bin_copy_buffer_length = 0; + } +} + +/* + * Sends current branch row to buffer + */ +static void +initBranchBinary(PGconn *con, PGresult *res, int64_t curr, int32_t parent) +{ + /* + * Each row has following extra bytes: + * - 2 bytes for number of columns + * - 4 bytes as length for each column + */ + int16_t max_row_len = 35 + 2 + 4*3; /* max row size is 32 */ + + flushBuffer(con, res, max_row_len); + + addColumnCounter(2); + + addInt32Column(curr + 1); + addInt32Column(0); +} + +/* + * Sends current teller row to buffer + */ +static void +initTellerBinary(PGconn *con, PGresult *res, int64_t curr, int32_t parent) +{ + /* + * Each row has following extra bytes: + * - 2 bytes for number of columns + * - 4 bytes as length for each column + */ + int16_t max_row_len = 40 + 2 + 4*4; /* max row size is 40 */ + + flushBuffer(con, res, max_row_len); + + addColumnCounter(3); + + addInt32Column(curr + 1); + addInt32Column(curr / parent + 1); + addInt32Column(0); +} + +/* + * Sends current account row to buffer + */ +static void +initAccountBinary(PGconn *con, PGresult *res, int64_t curr, int32_t parent) +{ + /* + * Each row has following extra bytes: + * - 2 bytes for number of columns + * - 4 bytes as length for each column + */ + int16_t max_row_len = 250 + 2 + 4*4; /* max row size is 250 for int64 */ + + flushBuffer(con, res, max_row_len); + + addColumnCounter(3); + + if (scale <= SCALE_32BIT_THRESHOLD) + addInt32Column(curr + 1); + else + addInt64Column(curr); + + addInt32Column(curr / parent + 1); + addInt32Column(0); +} + +/* + * Universal wrapper for sending data in binary format + */ +static void +initPopulateTableBinary(PGconn *con, char *table, char *columns, + int counter, int64_t base, initRowMethodBin init_row) +{ + int n; + PGresult *res; + char copy_statement[256]; + const char *copy_statement_fmt = "copy %s (%s) from stdin (format binary)"; + int64_t start = base * counter; + + bin_copy_buffer_length = 0; + + /* Use COPY with FREEZE on v14 and later for all ordinary tables */ + if ((PQserverVersion(con) >= 140000) && + get_table_relkind(con, table) == RELKIND_RELATION) + copy_statement_fmt = "copy %s (%s) from stdin with (format binary)"; + + n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table, columns); + if (n >= sizeof(copy_statement)) + pg_fatal("invalid buffer size: must be at least %d characters long", n); + else if (n == -1) + pg_fatal("invalid format string"); + + res = PQexec(con, copy_statement); + + if (PQresultStatus(res) != PGRES_COPY_IN) + pg_fatal("unexpected copy in result: %s", PQerrorMessage(con)); + PQclear(res); + + + sendBinaryCopyHeader(con); + + for (int64_t i = start; i < start + base; i++) + { + init_row(con, res, i, base); + } + + if (PQresultStatus(res) == PGRES_COPY_IN) + PQputCopyData(con, (char *) bin_copy_buffer, bin_copy_buffer_length); + else + fprintf(stderr, "Unexpected mode %d instead of %d\n", PQresultStatus(res), PGRES_COPY_IN); + + sendBinaryCopyTrailer(con); + + if (PQresultStatus(res) == PGRES_COPY_IN) + { + if (PQputCopyEnd(con, NULL) == 1) /* success */ + { + res = PQgetResult(con); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + fprintf(stderr, "Error: %s\n", PQerrorMessage(con)); + PQclear(res); + } + else + fprintf(stderr, "Error: %s\n", PQerrorMessage(con)); + } +} + +/* + * Wrapper for binary data load + */ +static void +initGenerateDataClientSideBinary(PGconn *con) +{ + + fprintf(stderr, "BINARY mode...\n"); + + bin_copy_buffer = pg_malloc(BIN_COPY_BUF_SIZE); + bin_copy_buffer_length = 0; + + /* + * we do all of this in one transaction to enable the backend's + * data-loading optimizations + */ + executeStatement(con, "begin"); + + /* truncate away any old data */ + initTruncateTables(con); + + executeStatement(con, "commit"); + + for (int i = 0; i < scale; i++) + { + initPopulateTableBinary(con, "pgbench_branches", "bid, bbalance", + i, nbranches, initBranchBinary); + initPopulateTableBinary(con, "pgbench_tellers", "tid, bid, tbalance", + i, ntellers, initTellerBinary); + initPopulateTableBinary(con, "pgbench_accounts", "aid, bid, abalance", + i, naccounts, initAccountBinary); + + executeStatement(con, "commit"); + } + + pg_free(bin_copy_buffer); +} + +/* + * Fill the standard tables with some data generated and sent from the client. + */ +static void +initGenerateDataClientSide(PGconn *con) +{ + fprintf(stderr, "generating data (client-side) in "); + + switch (data_generation_type) + { + case GEN_TYPE_COPY_ORIGINAL: + initGenerateDataClientSideText(con); + break; + case GEN_TYPE_COPY_BINARY: + initGenerateDataClientSideBinary(con); + break; + } +} + +/* + * Generating data via INSERT .. SELECT .. FROM generate_series + * whole dataset in single transaction + */ +static void +generateDataInsertSingleTXN(PGconn *con) { PQExpBufferData sql; - fprintf(stderr, "generating data (server-side)...\n"); + fprintf(stderr, "via INSERT .. SELECT generate_series... in single TXN\n"); + /* * we do all of this in one transaction to enable the backend's @@ -5170,27 +5633,166 @@ initGenerateDataServerSide(PGconn *con) initPQExpBuffer(&sql); printfPQExpBuffer(&sql, - "insert into pgbench_branches(bid,bbalance) " + "insert into pgbench_branches(bid, bbalance) " "select bid, 0 " - "from generate_series(1, %d) as bid", nbranches * scale); + "from generate_series(1, %d) as bid", + scale * nbranches); executeStatement(con, sql.data); printfPQExpBuffer(&sql, - "insert into pgbench_tellers(tid,bid,tbalance) " - "select tid, (tid - 1) / %d + 1, 0 " - "from generate_series(1, %d) as tid", ntellers, ntellers * scale); + "insert into pgbench_tellers(tid, bid, tbalance) " + "select tid + 1, tid / %d + 1, 0 " + "from generate_series(0, %d) as tid", + ntellers, (scale * ntellers) - 1); executeStatement(con, sql.data); printfPQExpBuffer(&sql, - "insert into pgbench_accounts(aid,bid,abalance,filler) " - "select aid, (aid - 1) / %d + 1, 0, '' " - "from generate_series(1, " INT64_FORMAT ") as aid", - naccounts, (int64) naccounts * scale); + "insert into pgbench_accounts(aid, bid, abalance, " + "filler) " + "select aid + 1, aid / %d + 1, 0, '' " + "from generate_series(0, " INT64_FORMAT ") as aid", + naccounts, (int64) (scale * naccounts) - 1); executeStatement(con, sql.data); + executeStatement(con, "commit"); + termPQExpBuffer(&sql); +} + + +/* + * Generating data via INSERT .. SELECT .. FROM generate_series + * One transaction per 'scale' + */ +static void +generateDataInsertSeries(PGconn *con) +{ + PQExpBufferData sql; + + fprintf(stderr, "via INSERT .. SELECT generate_series... in multiple TXN(s)\n"); + + initPQExpBuffer(&sql); + + executeStatement(con, "begin"); + + /* truncate away any old data */ + initTruncateTables(con); executeStatement(con, "commit"); + + for (int i = 0; i < scale; i++) + { + executeStatement(con, "begin"); + + printfPQExpBuffer(&sql, + "insert into pgbench_branches(bid, bbalance) " + "values(%d, 0)", i + 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_tellers(tid, bid, tbalance) " + "select tid + 1, tid / %d + 1, 0 " + "from generate_series(%d, %d) as tid", + ntellers, i * ntellers, (i + 1) * ntellers - 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_accounts(aid, bid, abalance, " + "filler) " + "select aid + 1, aid / %d + 1, 0, '' " + "from generate_series(" INT64_FORMAT ", " + INT64_FORMAT ") as aid", + naccounts, (int64) i * naccounts, + (int64) (i + 1) * naccounts - 1); + executeStatement(con, sql.data); + + executeStatement(con, "commit"); + } + + termPQExpBuffer(&sql); +} + +/* + * Generating data via INSERT .. SELECT .. FROM unnest + * One transaction per 'scale' + */ +static void +generateDataInsertUnnest(PGconn *con) +{ + PQExpBufferData sql; + + fprintf(stderr, "via INSERT .. SELECT unnest...\n"); + + initPQExpBuffer(&sql); + + executeStatement(con, "begin"); + + /* truncate away any old data */ + initTruncateTables(con); + + executeStatement(con, "commit"); + + for (int s = 0; s < scale; s++) + { + executeStatement(con, "begin"); + + printfPQExpBuffer(&sql, + "insert into pgbench_branches(bid,bbalance) " + "values(%d, 0)", s + 1); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "insert into pgbench_tellers(tid, bid, tbalance) " + "select unnest(array_agg(s.i order by s.i)) as tid, " + "%d as bid, 0 as tbalance " + "from generate_series(%d, %d) as s(i)", + s + 1, s * ntellers + 1, (s + 1) * ntellers); + executeStatement(con, sql.data); + + printfPQExpBuffer(&sql, + "with data as (" + " select generate_series(" INT64_FORMAT ", " + INT64_FORMAT ") as i) " + "insert into pgbench_accounts(aid, bid, " + "abalance, filler) " + "select unnest(aid), unnest(bid), 0 as abalance, " + "'' as filler " + "from (select array_agg(i+1) aid, " + "array_agg(i/%d + 1) bid from data)", + (int64) s * naccounts + 1, + (int64) (s + 1) * naccounts, naccounts); + executeStatement(con, sql.data); + + executeStatement(con, "commit"); + } + + termPQExpBuffer(&sql); +} + +/* + * Fill the standard tables with some data generated on the server + * + * As already the case with the client-side data generation, the filler + * column defaults to NULL in pgbench_branches and pgbench_tellers, + * and is a blank-padded string in pgbench_accounts. + */ +static void +initGenerateDataServerSide(PGconn *con) +{ + fprintf(stderr, "generating data (server-side) "); + + switch (data_generation_type) + { + case GEN_TYPE_INSERT_ORIGINAL: + generateDataInsertSingleTXN(con); + break; + case GEN_TYPE_INSERT_SERIES: + generateDataInsertSeries(con); + break; + case GEN_TYPE_INSERT_UNNEST: + generateDataInsertUnnest(con); + break; + } } /* @@ -5276,6 +5878,8 @@ initCreateFKeys(PGconn *con) static void checkInitSteps(const char *initialize_steps) { + char data_init_type = 0; + if (initialize_steps[0] == '\0') pg_fatal("no initialization steps specified"); @@ -5287,7 +5891,22 @@ checkInitSteps(const char *initialize_steps) pg_log_error_detail("Allowed step characters are: \"" ALL_INIT_STEPS "\"."); exit(1); } + + switch (*step) + { + case 'g': + case 'C': + case 'G': + case 'i': + case 'I': + data_init_type++; + data_generation_type = *step; + break; + } } + + if (data_init_type > 1) + pg_log_error("WARNING! More than one type of server-side data generation is requested"); } /* @@ -5326,10 +5945,13 @@ runInitSteps(const char *initialize_steps) initCreateTables(con); break; case 'g': + case 'C': op = "client-side generate"; initGenerateDataClientSide(con); break; case 'G': + case 'i': + case 'I': op = "server-side generate"; initGenerateDataServerSide(con); break; @@ -6319,6 +6941,7 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now, cur.serialization_failures += threads[i].stats.serialization_failures; cur.deadlock_failures += threads[i].stats.deadlock_failures; + cur.other_sql_failures += threads[i].stats.other_sql_failures; } /* we count only actually executed transactions */ @@ -6461,7 +7084,8 @@ printResults(StatsData *total, /* * Remaining stats are nonsensical if we failed to execute any xacts due - * to others than serialization or deadlock errors + * to other than serialization or deadlock errors and --continue-on-error + * is not set. */ if (total_cnt <= 0) return; @@ -6477,6 +7101,9 @@ printResults(StatsData *total, printf("number of deadlock failures: " INT64_FORMAT " (%.3f%%)\n", total->deadlock_failures, 100.0 * total->deadlock_failures / total_cnt); + printf("number of other failures: " INT64_FORMAT " (%.3f%%)\n", + total->other_sql_failures, + 100.0 * total->other_sql_failures / total_cnt); } /* it can be non-zero only if max_tries is not equal to one */ @@ -6580,6 +7207,10 @@ printResults(StatsData *total, sstats->deadlock_failures, (100.0 * sstats->deadlock_failures / script_total_cnt)); + printf(" - number of other failures: " INT64_FORMAT " (%.3f%%)\n", + sstats->other_sql_failures, + (100.0 * sstats->other_sql_failures / + script_total_cnt)); } /* @@ -6739,6 +7370,7 @@ main(int argc, char **argv) {"verbose-errors", no_argument, NULL, 15}, {"exit-on-abort", no_argument, NULL, 16}, {"debug", no_argument, NULL, 17}, + {"continue-on-error", no_argument, NULL, 18}, {NULL, 0, NULL, 0} }; @@ -7092,6 +7724,10 @@ main(int argc, char **argv) case 17: /* debug */ pg_logging_increase_verbosity(); break; + case 18: /* continue-on-error */ + benchmarking_option_set = true; + continue_on_error = true; + break; default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -7447,6 +8083,7 @@ main(int argc, char **argv) stats.retried += thread->stats.retried; stats.serialization_failures += thread->stats.serialization_failures; stats.deadlock_failures += thread->stats.deadlock_failures; + stats.other_sql_failures += thread->stats.other_sql_failures; latency_late += thread->latency_late; conn_total_duration += thread->conn_duration;