diff options
Diffstat (limited to 'contrib/pgbench/pgbench.c')
-rw-r--r-- | contrib/pgbench/pgbench.c | 1282 |
1 files changed, 1031 insertions, 251 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c index 24b0c7e105..534fd72150 100644 --- a/contrib/pgbench/pgbench.c +++ b/contrib/pgbench/pgbench.c @@ -5,7 +5,7 @@ * Originally written by Tatsuo Ishii and enhanced by many contributors. * * contrib/pgbench/pgbench.c - * Copyright (c) 2000-2012, PostgreSQL Global Development Group + * Copyright (c) 2000-2014, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -35,16 +35,12 @@ #include "getopt_long.h" #include "libpq-fe.h" -#include "libpq/pqsignal.h" #include "portability/instr_time.h" #include <ctype.h> - -#ifndef WIN32 +#include <math.h> +#include <signal.h> #include <sys/time.h> -#include <unistd.h> -#endif /* ! WIN32 */ - #ifdef HAVE_SYS_SELECT_H #include <sys/select.h> #endif @@ -73,7 +69,7 @@ static int pthread_join(pthread_t th, void **thread_return); #include <pthread.h> #else /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */ - +#define PTHREAD_FORK_EMULATION #include <sys/wait.h> #define pthread_t pg_pthread_t @@ -88,9 +84,6 @@ static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start static int pthread_join(pthread_t th, void **thread_return); #endif -extern char *optarg; -extern int optind; - /******************************************************************** * some configurable parameters */ @@ -102,6 +95,7 @@ extern int optind; #define MAXCLIENTS 1024 #endif +#define LOG_STEP_SECONDS 5 /* seconds between log messages */ #define DEFAULT_NXACTS 10 /* default nxacts */ int nxacts = 0; /* number of transactions per client */ @@ -120,11 +114,27 @@ int scale = 1; int fillfactor = 100; /* + * create foreign key constraints on the tables? + */ +int foreign_keys = 0; + +/* * use unlogged tables? */ int unlogged_tables = 0; /* + * log sampling rate (1.0 = log everything, 0.0 = option not given) + */ +double sample_rate = 0.0; + +/* + * When threads are throttled to a given rate limit, this is the target delay + * to reach that rate in usec. 0 is the default and means no throttling. + */ +int64 throttle_delay = 0; + +/* * tablespace selection */ char *tablespace = NULL; @@ -142,17 +152,33 @@ char *index_tablespace = NULL; #ifdef PGXC bool use_branch = false; /* use branch id in DDL and DML */ #endif +/* + * The scale factor at/beyond which 32bit integers are incapable of storing + * 64bit values. + * + * Although the actual threshold is 21474, we use 20000 because it is easier to + * document and remember, and isn't that far away from the real threshold. + */ +#define SCALE_32BIT_THRESHOLD 20000 + bool use_log; /* log transaction latencies to a file */ +bool use_quiet; /* quiet logging onto stderr */ +int agg_interval; /* log aggregates instead of individual + * transactions */ +int progress = 0; /* thread progress report every this seconds */ +int progress_nclients = 0; /* number of clients for progress + * report */ +int progress_nthreads = 0; /* number of threads for progress + * report */ bool is_connect; /* establish connection for each transaction */ bool is_latencies; /* report per-command latencies */ int main_pid; /* main process id used in log filename */ char *pghost = ""; char *pgport = ""; -char *pgoptions = NULL; -char *pgtty = NULL; char *login = NULL; char *dbName; +const char *progname; volatile bool timer_exceeded = false; /* flag from signal handler */ @@ -180,11 +206,15 @@ typedef struct int listen; /* 0 indicates that an async query has been * sent */ int sleeping; /* 1 indicates that the client is napping */ + bool throttling; /* whether nap is for throttling */ int64 until; /* napping until (usec) */ Variable *variables; /* array of variable definitions */ int nvariables; instr_time txn_begin; /* used for measuring transaction latencies */ instr_time stmt_begin; /* used for measuring statement latencies */ + int64 txn_latencies; /* cumulated latencies */ + int64 txn_sqlats; /* cumulated square latencies */ + bool is_throttled; /* whether transaction throttling is done */ int use_file; /* index in sql_files for this client */ bool prepared[MAX_FILES]; } CState; @@ -202,6 +232,9 @@ typedef struct instr_time *exec_elapsed; /* time spent executing cmds (per Command) */ int *exec_count; /* number of cmd executions (per Command) */ unsigned short random_state[3]; /* separate randomness for each thread */ + int64 throttle_trigger; /* previous/next throttling (us) */ + int64 throttle_lag; /* total transaction lag behind throttling */ + int64 throttle_lag_max; /* max transaction lag */ } TState; #define INVALID_THREAD ((pthread_t) 0) @@ -209,7 +242,11 @@ typedef struct typedef struct { instr_time conn_time; - int xacts; + int64 xacts; + int64 latencies; + int64 sqlats; + int64 throttle_lag; + int64 throttle_lag_max; } TResult; /* @@ -239,6 +276,19 @@ typedef struct char *argv[MAX_ARGS]; /* command word list */ } Command; +typedef struct +{ + + long start_time; /* when does the interval start */ + int cnt; /* number of transactions */ + double min_duration; /* min/max durations */ + double max_duration; + double sum; /* sum(duration), sum(duration^2) - for + * estimates */ + double sum2; + +} AggVals; + static Command **sql_files[MAX_FILES]; /* SQL script files */ static int num_files; /* number of script files */ static int num_commands = 0; /* total number of Command structs */ @@ -326,108 +376,134 @@ static char *select_only = { static void setalarm(int seconds); static void *threadRun(void *arg); - -/* - * routines to check mem allocations and fail noisily. - */ -static void * -xmalloc(size_t size) -{ - void *result; - - result = malloc(size); - if (!result) - { - fprintf(stderr, "out of memory\n"); - exit(1); - } - return result; -} - -static void * -xrealloc(void *ptr, size_t size) -{ - void *result; - - result = realloc(ptr, size); - if (!result) - { - fprintf(stderr, "out of memory\n"); - exit(1); - } - return result; -} - -static char * -xstrdup(const char *s) -{ - char *result; - - result = strdup(s); - if (!result) - { - fprintf(stderr, "out of memory\n"); - exit(1); - } - return result; -} - - static void -usage(const char *progname) +usage(void) { printf("%s is a benchmarking tool for PostgreSQL.\n\n" "Usage:\n" " %s [OPTION]... [DBNAME]\n" "\nInitialization options:\n" - " -i invokes initialization mode\n" - " -F NUM fill factor\n" + " -i, --initialize invokes initialization mode\n" + " -F, --fillfactor=NUM set fill factor\n" #ifdef PGXC " -k distribute by primary key branch id - bid\n" #endif - " -s NUM scaling factor\n" + " -n, --no-vacuum do not run VACUUM after initialization\n" + " -q, --quiet quiet logging (one message each 5 seconds)\n" + " -s, --scale=NUM scaling factor\n" + " --foreign-keys create foreign key constraints between tables\n" " --index-tablespace=TABLESPACE\n" - " create indexes in the specified tablespace\n" - " --tablespace=TABLESPACE\n" - " create tables in the specified tablespace\n" - " --unlogged-tables\n" - " create tables as unlogged tables\n" + " create indexes in the specified tablespace\n" + " --tablespace=TABLESPACE create tables in the specified tablespace\n" + " --unlogged-tables create tables as unlogged tables\n" "\nBenchmarking options:\n" - " -c NUM number of concurrent database clients (default: 1)\n" - " -C establish new connection for each transaction\n" - " -D VARNAME=VALUE\n" - " define variable for use by custom script\n" - " -f FILENAME read transaction script from FILENAME\n" + " -c, --client=NUM number of concurrent database clients (default: 1)\n" + " -C, --connect establish new connection for each transaction\n" + " -D, --define=VARNAME=VALUE\n" + " define variable for use by custom script\n" + " -f, --file=FILENAME read transaction script from FILENAME\n" #ifdef PGXC " -k query with default key and additional key branch id (bid)\n" #endif - " -j NUM number of threads (default: 1)\n" - " -l write transaction times to log file\n" - " -M simple|extended|prepared\n" - " protocol for submitting queries to server (default: simple)\n" - " -n do not run VACUUM before tests\n" - " -N do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n" - " -r report average latency per command\n" - " -s NUM report this scale factor in output\n" - " -S perform SELECT-only transactions\n" - " -t NUM number of transactions each client runs (default: 10)\n" - " -T NUM duration of benchmark test in seconds\n" - " -v vacuum all four standard tables before tests\n" + " -j, --jobs=NUM number of threads (default: 1)\n" + " -l, --log write transaction times to log file\n" + " -M, --protocol=simple|extended|prepared\n" + " protocol for submitting queries (default: simple)\n" + " -n, --no-vacuum do not run VACUUM before tests\n" + " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n" + " -P, --progress=NUM show thread progress report every NUM seconds\n" + " -r, --report-latencies report average latency per command\n" + " -R, --rate=NUM target rate in transactions per second\n" + " -s, --scale=NUM report this scale factor in output\n" + " -S, --select-only perform SELECT-only transactions\n" + " -t, --transactions=NUM number of transactions each client runs (default: 10)\n" + " -T, --time=NUM duration of benchmark test in seconds\n" + " -v, --vacuum-all vacuum all four standard tables before tests\n" + " --aggregate-interval=NUM aggregate data over NUM seconds\n" + " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n" "\nCommon options:\n" - " -d print debugging output\n" - " -h HOSTNAME database server host or socket directory\n" - " -p PORT database server port number\n" - " -U USERNAME connect as specified database user\n" - " --help show this help, then exit\n" - " --version output version information, then exit\n" + " -d, --debug print debugging output\n" + " -h, --host=HOSTNAME database server host or socket directory\n" + " -p, --port=PORT database server port number\n" + " -U, --username=USERNAME connect as specified database user\n" + " -V, --version output version information, then exit\n" + " -?, --help show this help, then exit\n" "\n" "Report bugs to <pgsql-bugs@postgresql.org>.\n", progname, progname); } +/* + * strtoint64 -- convert a string to 64-bit integer + * + * This function is a modified version of scanint8() from + * src/backend/utils/adt/int8.c. + */ +static int64 +strtoint64(const char *str) +{ + const char *ptr = str; + int64 result = 0; + int sign = 1; + + /* + * Do our own scan, rather than relying on sscanf which might be broken + * for long long. + */ + + /* skip leading spaces */ + while (*ptr && isspace((unsigned char) *ptr)) + ptr++; + + /* handle sign */ + if (*ptr == '-') + { + ptr++; + + /* + * Do an explicit check for INT64_MIN. Ugly though this is, it's + * cleaner than trying to get the loop below to handle it portably. + */ + if (strncmp(ptr, "9223372036854775808", 19) == 0) + { + result = -INT64CONST(0x7fffffffffffffff) - 1; + ptr += 19; + goto gotdigits; + } + sign = -1; + } + else if (*ptr == '+') + ptr++; + + /* require at least one digit */ + if (!isdigit((unsigned char) *ptr)) + fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str); + + /* process digits */ + while (*ptr && isdigit((unsigned char) *ptr)) + { + int64 tmp = result * 10 + (*ptr++ - '0'); + + if ((tmp / 10) != result) /* overflow? */ + fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str); + result = tmp; + } + +gotdigits: + + /* allow trailing whitespace, but not other trailing chars */ + while (*ptr != '\0' && isspace((unsigned char) *ptr)) + ptr++; + + if (*ptr != '\0') + fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str); + + return ((sign < 0) ? -result : result); +} + /* random number generator: uniform distribution from min to max inclusive */ -static int -getrand(TState *thread, int min, int max) +static int64 +getrand(TState *thread, int64 min, int64 max) { /* * Odd coding is so that min and max have approximately the same chance of @@ -438,7 +514,7 @@ getrand(TState *thread, int min, int max) * protected by a mutex, and therefore a bottleneck on machines with many * CPUs. */ - return min + (int) ((max - min + 1) * pg_erand48(thread->random_state)); + return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state)); } /* call PQexec() and exit() on failure */ @@ -470,10 +546,30 @@ doConnect(void) */ do { +#define PARAMS_ARRAY_SIZE 7 + + const char *keywords[PARAMS_ARRAY_SIZE]; + const char *values[PARAMS_ARRAY_SIZE]; + + keywords[0] = "host"; + values[0] = pghost; + keywords[1] = "port"; + values[1] = pgport; + keywords[2] = "user"; + values[2] = login; + keywords[3] = "password"; + values[3] = password; + keywords[4] = "dbname"; + values[4] = dbName; + keywords[5] = "fallback_application_name"; + values[5] = progname; + keywords[6] = NULL; + values[6] = NULL; + new_pass = false; - conn = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName, - login, password); + conn = PQconnectdbParams(keywords, values, true); + if (!conn) { fprintf(stderr, "Connection to database \"%s\" failed\n", @@ -593,17 +689,17 @@ putVariable(CState *st, const char *context, char *name, char *value) } if (st->variables) - newvars = (Variable *) xrealloc(st->variables, + newvars = (Variable *) pg_realloc(st->variables, (st->nvariables + 1) * sizeof(Variable)); else - newvars = (Variable *) xmalloc(sizeof(Variable)); + newvars = (Variable *) pg_malloc(sizeof(Variable)); st->variables = newvars; var = &newvars[st->nvariables]; - var->name = xstrdup(name); - var->value = xstrdup(value); + var->name = pg_strdup(name); + var->value = pg_strdup(value); st->nvariables++; @@ -615,7 +711,7 @@ putVariable(CState *st, const char *context, char *name, char *value) char *val; /* dup then free, in case value is pointing at this variable */ - val = xstrdup(value); + val = pg_strdup(value); free(var->value); var->value = val; @@ -637,7 +733,7 @@ parseVariable(const char *sql, int *eaten) if (i == 1) return NULL; - name = xmalloc(i); + name = pg_malloc(i); memcpy(name, &sql[1], i - 1); name[i - 1] = '\0'; @@ -654,7 +750,7 @@ replaceVariable(char **sql, char *param, int len, char *value) { size_t offset = param - *sql; - *sql = xrealloc(*sql, strlen(*sql) - len + valueln + 1); + *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1); param = *sql + offset; } @@ -836,23 +932,90 @@ clientDone(CState *st, bool ok) return false; /* always false */ } +static +void +agg_vals_init(AggVals *aggs, instr_time start) +{ + /* basic counters */ + aggs->cnt = 0; /* number of transactions */ + aggs->sum = 0; /* SUM(duration) */ + aggs->sum2 = 0; /* SUM(duration*duration) */ + + /* min and max transaction duration */ + aggs->min_duration = 0; + aggs->max_duration = 0; + + /* start of the current interval */ + aggs->start_time = INSTR_TIME_GET_DOUBLE(start); +} + /* return false iff client should be disconnected */ static bool -doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile) +doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg) { PGresult *res; Command **commands; + bool trans_needs_throttle = false; top: commands = sql_files[st->use_file]; + /* + * Handle throttling once per transaction by sleeping. It is simpler to + * do this here rather than at the end, because so much complicated logic + * happens below when statements finish. + */ + if (throttle_delay && !st->is_throttled) + { + /* + * Use inverse transform sampling to randomly generate a delay, such + * that the series of delays will approximate a Poisson distribution + * centered on the throttle_delay time. + * + * 10000 implies a 9.2 (-log(1/10000)) to 0.0 (log 1) delay + * multiplier, and results in a 0.055 % target underestimation bias: + * + * SELECT 1.0/AVG(-LN(i/10000.0)) FROM generate_series(1,10000) AS i; + * = 1.000552717032611116335474 + * + * If transactions are too slow or a given wait is shorter than a + * transaction, the next transaction will start right away. + */ + int64 wait = (int64) (throttle_delay * + 1.00055271703 * -log(getrand(thread, 1, 10000) / 10000.0)); + + thread->throttle_trigger += wait; + + st->until = thread->throttle_trigger; + st->sleeping = 1; + st->throttling = true; + st->is_throttled = true; + if (debug) + fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n", + st->id, wait); + } + if (st->sleeping) { /* are we sleeping? */ instr_time now; + int64 now_us; INSTR_TIME_SET_CURRENT(now); - if (st->until <= INSTR_TIME_GET_MICROSEC(now)) + now_us = INSTR_TIME_GET_MICROSEC(now); + if (st->until <= now_us) + { st->sleeping = 0; /* Done sleeping, go ahead with next command */ + if (st->throttling) + { + /* Measure lag of throttled transaction relative to target */ + int64 lag = now_us - st->until; + + thread->throttle_lag += lag; + if (lag > thread->throttle_lag_max) + thread->throttle_lag_max = lag; + st->throttling = false; + } + } else return true; /* Still sleeping, nothing to do here */ } @@ -887,6 +1050,27 @@ top: thread->exec_count[cnum]++; } + /* transaction finished: record latency under progress or throttling */ + if ((progress || throttle_delay) && commands[st->state + 1] == NULL) + { + instr_time diff; + int64 latency; + + INSTR_TIME_SET_CURRENT(diff); + INSTR_TIME_SUBTRACT(diff, st->txn_begin); + latency = INSTR_TIME_GET_MICROSEC(diff); + st->txn_latencies += latency; + + /* + * XXX In a long benchmark run of high-latency transactions, this + * int64 addition eventually overflows. For example, 100 threads + * running 10s transactions will overflow it in 2.56 hours. With + * a more-typical OLTP workload of .1s transactions, overflow + * would take 256 hours. + */ + st->txn_sqlats += latency * latency; + } + /* * if transaction finished, record the time it took in the log */ @@ -896,21 +1080,105 @@ top: instr_time diff; double usec; - INSTR_TIME_SET_CURRENT(now); - diff = now; - INSTR_TIME_SUBTRACT(diff, st->txn_begin); - usec = (double) INSTR_TIME_GET_MICROSEC(diff); + /* + * write the log entry if this row belongs to the random sample, + * or no sampling rate was given which means log everything. + */ + if (sample_rate == 0.0 || + pg_erand48(thread->random_state) <= sample_rate) + { + INSTR_TIME_SET_CURRENT(now); + diff = now; + INSTR_TIME_SUBTRACT(diff, st->txn_begin); + usec = (double) INSTR_TIME_GET_MICROSEC(diff); + + /* should we aggregate the results or not? */ + if (agg_interval > 0) + { + /* + * are we still in the same interval? if yes, accumulate + * the values (print them otherwise) + */ + if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now)) + { + agg->cnt += 1; + agg->sum += usec; + agg->sum2 += usec * usec; + + /* first in this aggregation interval */ + if ((agg->cnt == 1) || (usec < agg->min_duration)) + agg->min_duration = usec; + if ((agg->cnt == 1) || (usec > agg->max_duration)) + agg->max_duration = usec; + } + else + { + /* + * Loop until we reach the interval of the current + * transaction (and print all the empty intervals in + * between). + */ + while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now)) + { + /* + * This is a non-Windows branch (thanks to the + * ifdef in usage), so we don't need to handle + * this in a special way (see below). + */ + fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n", + agg->start_time, + agg->cnt, + agg->sum, + agg->sum2, + agg->min_duration, + agg->max_duration); + + /* move to the next inteval */ + agg->start_time = agg->start_time + agg_interval; + + /* reset for "no transaction" intervals */ + agg->cnt = 0; + agg->min_duration = 0; + agg->max_duration = 0; + agg->sum = 0; + agg->sum2 = 0; + } + + /* + * and now update the reset values (include the + * current) + */ + agg->cnt = 1; + agg->min_duration = usec; + agg->max_duration = usec; + agg->sum = usec; + agg->sum2 = usec * usec; + } + } + else + { + /* no, print raw transactions */ #ifndef WIN32 - /* This is more than we really ought to know about instr_time */ - fprintf(logfile, "%d %d %.0f %d %ld %ld\n", - st->id, st->cnt, usec, st->use_file, - (long) now.tv_sec, (long) now.tv_usec); + + /* + * This is more than we really ought to know about + * instr_time + */ + fprintf(logfile, "%d %d %.0f %d %ld %ld\n", + st->id, st->cnt, usec, st->use_file, + (long) now.tv_sec, (long) now.tv_usec); #else - /* On Windows, instr_time doesn't provide a timestamp anyway */ - fprintf(logfile, "%d %d %.0f %d 0 0\n", - st->id, st->cnt, usec, st->use_file); + + /* + * On Windows, instr_time doesn't provide a timestamp + * anyway + */ + fprintf(logfile, "%d %d %.0f %d 0 0\n", + st->id, st->cnt, usec, st->use_file); #endif + } + } } if (commands[st->state]->type == SQL_COMMAND) @@ -953,8 +1221,19 @@ top: if (commands[st->state] == NULL) { st->state = 0; - st->use_file = getrand(thread, 0, num_files - 1); + st->use_file = (int) getrand(thread, 0, num_files - 1); commands = sql_files[st->use_file]; + st->is_throttled = false; + + /* + * No transaction is underway anymore, which means there is + * nothing to listen to right now. When throttling rate limits + * are active, a sleep will happen next, as the next transaction + * starts. And then in any case the next SQL command will set + * listen back to 1. + */ + st->listen = 0; + trans_needs_throttle = (throttle_delay > 0); } } @@ -973,8 +1252,19 @@ top: INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); } - /* Record transaction start time if logging is enabled */ - if (logfile && st->state == 0) + /* + * This ensures that a throttling delay is inserted before proceeding with + * sql commands, after the first transaction. The first transaction + * throttling is performed when first entering doCustom. + */ + if (trans_needs_throttle) + { + trans_needs_throttle = false; + goto top; + } + + /* Record transaction start time under logging, progress or throttling */ + if ((logfile || progress || throttle_delay) && st->state == 0) INSTR_TIME_SET_CURRENT(st->txn_begin); /* Record statement start time if per-command latencies are requested */ @@ -990,7 +1280,7 @@ top: { char *sql; - sql = xstrdup(command->argv[0]); + sql = pg_strdup(command->argv[0]); sql = assignVariables(st, sql); if (debug) @@ -1073,7 +1363,7 @@ top: if (pg_strcasecmp(argv[0], "setrandom") == 0) { char *var; - int min, + int64 min, max; char res[64]; @@ -1085,10 +1375,10 @@ top: st->ecnt++; return true; } - min = atoi(var); + min = strtoint64(var); } else - min = atoi(argv[2]); + min = strtoint64(argv[2]); #ifdef NOT_USED if (min < 0) @@ -1107,10 +1397,10 @@ top: st->ecnt++; return true; } - max = atoi(var); + max = strtoint64(var); } else - max = atoi(argv[3]); + max = strtoint64(argv[3]); if (max < min) { @@ -1120,11 +1410,11 @@ top: } /* - * getrand() neeeds to be able to subtract max from min and add - * one the result without overflowing. Since we know max > min, - * we can detect overflow just by checking for a negative result. - * But we must check both that the subtraction doesn't overflow, - * and that adding one to the result doesn't overflow either. + * getrand() needs to be able to subtract max from min and add one + * to the result without overflowing. Since we know max > min, we + * can detect overflow just by checking for a negative result. But + * we must check both that the subtraction doesn't overflow, and + * that adding one to the result doesn't overflow either. */ if (max - min < 0 || (max - min) + 1 < 0) { @@ -1134,9 +1424,9 @@ top: } #ifdef DEBUG - printf("min: %d max: %d random: %d\n", min, max, getrand(thread, min, max)); + printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max)); #endif - snprintf(res, sizeof(res), "%d", getrand(thread, min, max)); + snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max)); if (!putVariable(st, argv[0], argv[1], res)) { @@ -1149,7 +1439,7 @@ top: else if (pg_strcasecmp(argv[0], "set") == 0) { char *var; - int ope1, + int64 ope1, ope2; char res[64]; @@ -1161,13 +1451,13 @@ top: st->ecnt++; return true; } - ope1 = atoi(var); + ope1 = strtoint64(var); } else - ope1 = atoi(argv[2]); + ope1 = strtoint64(argv[2]); if (argc < 5) - snprintf(res, sizeof(res), "%d", ope1); + snprintf(res, sizeof(res), INT64_FORMAT, ope1); else { if (*argv[4] == ':') @@ -1178,17 +1468,17 @@ top: st->ecnt++; return true; } - ope2 = atoi(var); + ope2 = strtoint64(var); } else - ope2 = atoi(argv[4]); + ope2 = strtoint64(argv[4]); if (strcmp(argv[3], "+") == 0) - snprintf(res, sizeof(res), "%d", ope1 + ope2); + snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2); else if (strcmp(argv[3], "-") == 0) - snprintf(res, sizeof(res), "%d", ope1 - ope2); + snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2); else if (strcmp(argv[3], "*") == 0) - snprintf(res, sizeof(res), "%d", ope1 * ope2); + snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2); else if (strcmp(argv[3], "/") == 0) { if (ope2 == 0) @@ -1197,7 +1487,7 @@ top: st->ecnt++; return true; } - snprintf(res, sizeof(res), "%d", ope1 / ope2); + snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2); } else { @@ -1302,31 +1592,44 @@ disconnect_all(CState *state, int length) /* create tables and setup data */ static void -init(void) +init(bool is_no_vacuum) { +/* + * The scale factor at/beyond which 32-bit integers are insufficient for + * storing TPC-B account IDs. + * + * Although the actual threshold is 21474, we use 20000 because it is easier to + * document and remember, and isn't that far away from the real threshold. + */ +#define SCALE_32BIT_THRESHOLD 20000 + /* * Note: TPC-B requires at least 100 bytes per row, and the "filler" * fields in these table declarations were intended to comply with that. - * But because they default to NULLs, they don't actually take any space. - * We could fix that by giving them non-null default values. However, that + * The pgbench_accounts table complies with that because the "filler" + * column is set to blank-padded empty string. But for all other tables + * the columns default to NULL and so don't actually take any space. We + * could fix that by giving them non-null default values. However, that * would completely break comparability of pgbench results with prior - * versions. Since pgbench has never pretended to be fully TPC-B - * compliant anyway, we stick with the historical behavior. + * versions. Since pgbench has never pretended to be fully TPC-B compliant + * anyway, we stick with the historical behavior. */ struct ddlinfo { - char *table; - char *cols; + const char *table; /* table name */ + const char *smcols; /* column decls if accountIDs are 32 bits */ + const char *bigcols; /* column decls if accountIDs are 64 bits */ int declare_fillfactor; #ifdef PGXC char *distribute_by; #endif }; - struct ddlinfo DDLs[] = { + static const struct ddlinfo DDLs[] = { { - "pgbench_branches", - "bid int not null,bbalance int,filler char(88)", - 1 + "pgbench_history", + "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)", + "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)", + 0 #ifdef PGXC , "distribute by hash (bid)" #endif @@ -1334,6 +1637,7 @@ init(void) { "pgbench_tellers", "tid int not null,bid int,tbalance int,filler char(84)", + "tid int not null,bid int,tbalance int,filler char(84)", 1 #ifdef PGXC , "distribute by hash (bid)" @@ -1341,27 +1645,35 @@ init(void) }, { "pgbench_accounts", - "aid int not null,bid int,abalance int,filler char(84)", + "aid int not null,bid int,abalance int,filler char(84)", + "aid bigint not null,bid int,abalance int,filler char(84)", 1 #ifdef PGXC , "distribute by hash (bid)" #endif }, { - "pgbench_history", - "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)", - 0 + "pgbench_branches", + "bid int not null,bbalance int,filler char(88)", + "bid int not null,bbalance int,filler char(88)", + 1 #ifdef PGXC , "distribute by hash (bid)" #endif } }; - - static char *DDLAFTERs[] = { + static const char *const DDLINDEXes[] = { "alter table pgbench_branches add primary key (bid)", "alter table pgbench_tellers add primary key (tid)", "alter table pgbench_accounts add primary key (aid)" }; + static const char *const DDLKEYs[] = { + "alter table pgbench_tellers add foreign key (bid) references pgbench_branches", + "alter table pgbench_accounts add foreign key (bid) references pgbench_branches", + "alter table pgbench_history add foreign key (bid) references pgbench_branches", + "alter table pgbench_history add foreign key (tid) references pgbench_tellers", + "alter table pgbench_history add foreign key (aid) references pgbench_accounts" + }; #ifdef PGXC static char *DDLAFTERs_bid[] = { @@ -1375,6 +1687,14 @@ init(void) PGresult *res; char sql[256]; int i; + int64 k; + + /* used to track elapsed time and estimate of the remaining time */ + instr_time start, + diff; + double elapsed_sec, + remaining_sec; + int log_interval = 1; if ((con = doConnect()) == NULL) exit(1); @@ -1383,16 +1703,17 @@ init(void) { char opts[256]; char buffer[256]; - struct ddlinfo *ddl = &DDLs[i]; + const struct ddlinfo *ddl = &DDLs[i]; + const char *cols; /* Remove old table, if it exists. */ - snprintf(buffer, 256, "drop table if exists %s", ddl->table); + snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table); executeStatement(con, buffer); /* Construct new create table statement. */ opts[0] = '\0'; if (ddl->declare_fillfactor) - snprintf(opts + strlen(opts), 256 - strlen(opts), + snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts), " with (fillfactor=%d)", fillfactor); if (tablespace != NULL) { @@ -1400,21 +1721,24 @@ init(void) escape_tablespace = PQescapeIdentifier(con, tablespace, strlen(tablespace)); - snprintf(opts + strlen(opts), 256 - strlen(opts), + snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts), " tablespace %s", escape_tablespace); PQfreemem(escape_tablespace); } + + cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols; + #ifdef PGXC /* Add distribution columns if necessary */ if (use_branch) - snprintf(buffer, 256, "create%s table %s(%s)%s %s", + snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s %s", unlogged_tables ? " unlogged" : "", ddl->table, ddl->cols, opts, ddl->distribute_by); else #endif - snprintf(buffer, 256, "create%s table %s(%s)%s", + snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s", unlogged_tables ? " unlogged" : "", - ddl->table, ddl->cols, opts); + ddl->table, cols, opts); executeStatement(con, buffer); } @@ -1423,13 +1747,18 @@ init(void) for (i = 0; i < nbranches * scale; i++) { - snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1); + /* "filler" column defaults to NULL */ + snprintf(sql, sizeof(sql), + "insert into pgbench_branches(bid,bbalance) values(%d,0)", + i + 1); executeStatement(con, sql); } for (i = 0; i < ntellers * scale; i++) { - snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)", + /* "filler" column defaults to NULL */ + snprintf(sql, sizeof(sql), + "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)", i + 1, i / ntellers + 1); executeStatement(con, sql); } @@ -1452,19 +1781,60 @@ init(void) } PQclear(res); - for (i = 0; i < naccounts * scale; i++) + INSTR_TIME_SET_CURRENT(start); + + for (k = 0; k < (int64) naccounts * scale; k++) { - int j = i + 1; + int64 j = k + 1; - snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0); + /* "filler" column defaults to blank padded empty string */ + snprintf(sql, sizeof(sql), + INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n", + j, k / naccounts + 1, 0); if (PQputline(con, sql)) { fprintf(stderr, "PQputline failed\n"); exit(1); } - if (j % 10000 == 0) - fprintf(stderr, "%d tuples done.\n", j); + /* + * If we want to stick with the original logging, print a message each + * 100k inserted rows. + */ + if ((!use_quiet) && (j % 100000 == 0)) + { + INSTR_TIME_SET_CURRENT(diff); + INSTR_TIME_SUBTRACT(diff, start); + + elapsed_sec = INSTR_TIME_GET_DOUBLE(diff); + remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j; + + fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n", + j, (int64) naccounts * scale, + (int) (((int64) j * 100) / (naccounts * (int64) scale)), + elapsed_sec, remaining_sec); + } + /* let's not call the timing for each row, but only each 100 rows */ + else if (use_quiet && (j % 100 == 0)) + { + INSTR_TIME_SET_CURRENT(diff); + INSTR_TIME_SUBTRACT(diff, start); + + elapsed_sec = INSTR_TIME_GET_DOUBLE(diff); + remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j; + + /* have we reached the next interval (or end)? */ + if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) + { + fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n", + j, (int64) naccounts * scale, + (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec); + + /* skip to the next interval */ + log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); + } + } + } if (PQputline(con, "\\.\n")) { @@ -1478,6 +1848,16 @@ init(void) } executeStatement(con, "commit"); + /* vacuum */ + if (!is_no_vacuum) + { + fprintf(stderr, "vacuum...\n"); + executeStatement(con, "vacuum analyze pgbench_branches"); + executeStatement(con, "vacuum analyze pgbench_tellers"); + executeStatement(con, "vacuum analyze pgbench_accounts"); + executeStatement(con, "vacuum analyze pgbench_history"); + } + /* * create indexes */ @@ -1511,11 +1891,11 @@ init(void) } else #endif - for (i = 0; i < lengthof(DDLAFTERs); i++) + for (i = 0; i < lengthof(DDLINDEXes); i++) { char buffer[256]; - strncpy(buffer, DDLAFTERs[i], 256); + strlcpy(buffer, DDLINDEXes[i], sizeof(buffer)); if (index_tablespace != NULL) { @@ -1523,7 +1903,7 @@ init(void) escape_tablespace = PQescapeIdentifier(con, index_tablespace, strlen(index_tablespace)); - snprintf(buffer + strlen(buffer), 256 - strlen(buffer), + snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer), " using index tablespace %s", escape_tablespace); PQfreemem(escape_tablespace); } @@ -1531,12 +1911,17 @@ init(void) executeStatement(con, buffer); } - /* vacuum */ - fprintf(stderr, "vacuum..."); - executeStatement(con, "vacuum analyze pgbench_branches"); - executeStatement(con, "vacuum analyze pgbench_tellers"); - executeStatement(con, "vacuum analyze pgbench_accounts"); - executeStatement(con, "vacuum analyze pgbench_history"); + /* + * create foreign keys + */ + if (foreign_keys) + { + fprintf(stderr, "set foreign keys...\n"); + for (i = 0; i < lengthof(DDLKEYs); i++) + { + executeStatement(con, DDLKEYs[i]); + } + } fprintf(stderr, "done.\n"); PQfinish(con); @@ -1551,7 +1936,7 @@ parseQuery(Command *cmd, const char *raw_sql) char *sql, *p; - sql = xstrdup(raw_sql); + sql = pg_strdup(raw_sql); cmd->argc = 1; p = sql; @@ -1613,8 +1998,8 @@ process_commands(char *buf) return NULL; /* Allocate and initialize Command structure */ - my_commands = (Command *) xmalloc(sizeof(Command)); - my_commands->line = xstrdup(buf); + my_commands = (Command *) pg_malloc(sizeof(Command)); + my_commands->line = pg_strdup(buf); my_commands->command_num = num_commands++; my_commands->type = 0; /* until set */ my_commands->argc = 0; @@ -1628,7 +2013,7 @@ process_commands(char *buf) while (tok != NULL) { - my_commands->argv[j++] = xstrdup(tok); + my_commands->argv[j++] = pg_strdup(tok); my_commands->argc++; tok = strtok(NULL, delim); } @@ -1730,7 +2115,7 @@ process_commands(char *buf) switch (querymode) { case QUERY_SIMPLE: - my_commands->argv[0] = xstrdup(p); + my_commands->argv[0] = pg_strdup(p); my_commands->argc++; break; case QUERY_EXTENDED: @@ -1746,6 +2131,49 @@ process_commands(char *buf) return my_commands; } +/* + * Read a line from fd, and return it in a malloc'd buffer. + * Return NULL at EOF. + * + * The buffer will typically be larger than necessary, but we don't care + * in this program, because we'll free it as soon as we've parsed the line. + */ +static char * +read_line_from_file(FILE *fd) +{ + char tmpbuf[BUFSIZ]; + char *buf; + size_t buflen = BUFSIZ; + size_t used = 0; + + buf = (char *) palloc(buflen); + buf[0] = '\0'; + + while (fgets(tmpbuf, BUFSIZ, fd) != NULL) + { + size_t thislen = strlen(tmpbuf); + + /* Append tmpbuf to whatever we had already */ + memcpy(buf + used, tmpbuf, thislen + 1); + used += thislen; + + /* Done if we collected a newline */ + if (thislen > 0 && tmpbuf[thislen - 1] == '\n') + break; + + /* Else, enlarge buf to ensure we can append next bufferload */ + buflen += BUFSIZ; + buf = (char *) pg_realloc(buf, buflen); + } + + if (used > 0) + return buf; + + /* Reached EOF */ + free(buf); + return NULL; +} + static int process_file(char *filename) { @@ -1754,7 +2182,7 @@ process_file(char *filename) Command **my_commands; FILE *fd; int lineno; - char buf[BUFSIZ]; + char *buf; int alloc_num; if (num_files >= MAX_FILES) @@ -1764,7 +2192,7 @@ process_file(char *filename) } alloc_num = COMMANDS_ALLOC_NUM; - my_commands = (Command **) xmalloc(sizeof(Command *) * alloc_num); + my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num); if (strcmp(filename, "-") == 0) fd = stdin; @@ -1776,11 +2204,14 @@ process_file(char *filename) lineno = 0; - while (fgets(buf, sizeof(buf), fd) != NULL) + while ((buf = read_line_from_file(fd)) != NULL) { Command *command; command = process_commands(buf); + + free(buf); + if (command == NULL) continue; @@ -1790,7 +2221,7 @@ process_file(char *filename) if (lineno >= alloc_num) { alloc_num += COMMANDS_ALLOC_NUM; - my_commands = xrealloc(my_commands, sizeof(Command *) * alloc_num); + my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num); } } fclose(fd); @@ -1813,7 +2244,7 @@ process_builtin(char *tb) int alloc_num; alloc_num = COMMANDS_ALLOC_NUM; - my_commands = (Command **) xmalloc(sizeof(Command *) * alloc_num); + my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num); lineno = 0; @@ -1844,7 +2275,7 @@ process_builtin(char *tb) if (lineno >= alloc_num) { alloc_num += COMMANDS_ALLOC_NUM; - my_commands = xrealloc(my_commands, sizeof(Command *) * alloc_num); + my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num); } } @@ -1855,9 +2286,11 @@ process_builtin(char *tb) /* print out results */ static void -printResults(int ttype, int normal_xacts, int nclients, +printResults(int ttype, int64 normal_xacts, int nclients, TState *threads, int nthreads, - instr_time total_time, instr_time conn_total_time) + instr_time total_time, instr_time conn_total_time, + int64 total_latencies, int64 total_sqlats, + int64 throttle_lag, int64 throttle_lag_max) { double time_include, tps_include, @@ -1886,15 +2319,45 @@ printResults(int ttype, int normal_xacts, int nclients, if (duration <= 0) { printf("number of transactions per client: %d\n", nxacts); - printf("number of transactions actually processed: %d/%d\n", - normal_xacts, nxacts * nclients); + printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n", + normal_xacts, (int64) nxacts * nclients); } else { printf("duration: %d s\n", duration); - printf("number of transactions actually processed: %d\n", + printf("number of transactions actually processed: " INT64_FORMAT "\n", normal_xacts); } + + if (throttle_delay || progress) + { + /* compute and show latency average and standard deviation */ + double latency = 0.001 * total_latencies / normal_xacts; + double sqlat = (double) total_sqlats / normal_xacts; + + printf("latency average: %.3f ms\n" + "latency stddev: %.3f ms\n", + latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency)); + } + else + { + /* only an average latency computed from the duration is available */ + printf("latency average: %.3f ms\n", + 1000.0 * duration * nclients / normal_xacts); + } + + if (throttle_delay) + { + /* + * Report average transaction lag under rate limit throttling. This + * is the delay between scheduled and actual start times for the + * transaction. The measured lag may be caused by thread/client load, + * the database load, or the Poisson throttling process. + */ + printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n", + 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max); + } + printf("tps = %f (including connections establishing)\n", tps_include); printf("tps = %f (excluding connections establishing)\n", tps_exclude); @@ -1948,6 +2411,42 @@ printResults(int ttype, int normal_xacts, int nclients, int main(int argc, char **argv) { + static struct option long_options[] = { + /* systematic long/short named options */ + {"client", required_argument, NULL, 'c'}, + {"connect", no_argument, NULL, 'C'}, + {"debug", no_argument, NULL, 'd'}, + {"define", required_argument, NULL, 'D'}, + {"file", required_argument, NULL, 'f'}, + {"fillfactor", required_argument, NULL, 'F'}, + {"host", required_argument, NULL, 'h'}, + {"initialize", no_argument, NULL, 'i'}, + {"jobs", required_argument, NULL, 'j'}, + {"log", no_argument, NULL, 'l'}, + {"no-vacuum", no_argument, NULL, 'n'}, + {"port", required_argument, NULL, 'p'}, + {"progress", required_argument, NULL, 'P'}, + {"protocol", required_argument, NULL, 'M'}, + {"quiet", no_argument, NULL, 'q'}, + {"report-latencies", no_argument, NULL, 'r'}, + {"scale", required_argument, NULL, 's'}, + {"select-only", no_argument, NULL, 'S'}, + {"skip-some-updates", no_argument, NULL, 'N'}, + {"time", required_argument, NULL, 'T'}, + {"transactions", required_argument, NULL, 't'}, + {"username", required_argument, NULL, 'U'}, + {"vacuum-all", no_argument, NULL, 'v'}, + /* long-named only options */ + {"foreign-keys", no_argument, &foreign_keys, 1}, + {"index-tablespace", required_argument, NULL, 3}, + {"tablespace", required_argument, NULL, 2}, + {"unlogged-tables", no_argument, &unlogged_tables, 1}, + {"sampling-rate", required_argument, NULL, 4}, + {"aggregate-interval", required_argument, NULL, 5}, + {"rate", required_argument, NULL, 'R'}, + {NULL, 0, NULL, 0} + }; + int c; int nclients = 1; /* default number of simulated clients */ int nthreads = 1; /* default number of threads */ @@ -1966,17 +2465,14 @@ main(int argc, char **argv) instr_time start_time; /* start up time */ instr_time total_time; instr_time conn_total_time; - int total_xacts; + int64 total_xacts = 0; + int64 total_latencies = 0; + int64 total_sqlats = 0; + int64 throttle_lag = 0; + int64 throttle_lag_max = 0; int i; - static struct option long_options[] = { - {"index-tablespace", required_argument, NULL, 3}, - {"tablespace", required_argument, NULL, 2}, - {"unlogged-tables", no_argument, &unlogged_tables, 1}, - {NULL, 0, NULL, 0} - }; - #ifdef HAVE_GETRLIMIT struct rlimit rlim; #endif @@ -1987,15 +2483,13 @@ main(int argc, char **argv) char val[64]; - const char *progname; - progname = get_progname(argv[0]); if (argc > 1) { if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) { - usage(progname); + usage(); exit(0); } if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0) @@ -2017,13 +2511,13 @@ main(int argc, char **argv) else if ((env = getenv("PGUSER")) != NULL && *env != '\0') login = env; - state = (CState *) xmalloc(sizeof(CState)); + state = (CState *) pg_malloc(sizeof(CState)); memset(state, 0, sizeof(CState)); #ifdef PGXC - while ((c = getopt_long(argc, argv, "ih:knvp:dSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "ih:knvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1) #else - while ((c = getopt_long(argc, argv, "ih:nvp:dSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1) #endif { switch (c) @@ -2037,7 +2531,7 @@ main(int argc, char **argv) break; #endif case 'h': - pghost = optarg; + pghost = pg_strdup(optarg); break; case 'n': is_no_vacuum++; @@ -2046,7 +2540,7 @@ main(int argc, char **argv) do_vacuum_accounts++; break; case 'p': - pgport = optarg; + pgport = pg_strdup(optarg); break; case 'd': debug++; @@ -2132,14 +2626,17 @@ main(int argc, char **argv) } break; case 'U': - login = optarg; + login = pg_strdup(optarg); break; case 'l': use_log = true; break; + case 'q': + use_quiet = true; + break; case 'f': ttype = 3; - filename = optarg; + filename = pg_strdup(optarg); if (process_file(filename) == false || *sql_files[num_files - 1] == NULL) exit(1); break; @@ -2181,14 +2678,59 @@ main(int argc, char **argv) exit(1); } break; + case 'P': + progress = atoi(optarg); + if (progress <= 0) + { + fprintf(stderr, + "thread progress delay (-P) must be positive (%s)\n", + optarg); + exit(1); + } + break; + case 'R': + { + /* get a double from the beginning of option value */ + double throttle_value = atof(optarg); + + if (throttle_value <= 0.0) + { + fprintf(stderr, "invalid rate limit: %s\n", optarg); + exit(1); + } + /* Invert rate limit into a time offset */ + throttle_delay = (int64) (1000000.0 / throttle_value); + } + break; case 0: /* This covers long options which take no argument. */ break; case 2: /* tablespace */ - tablespace = optarg; + tablespace = pg_strdup(optarg); break; case 3: /* index-tablespace */ - index_tablespace = optarg; + index_tablespace = pg_strdup(optarg); + break; + case 4: + sample_rate = atof(optarg); + if (sample_rate <= 0.0 || sample_rate > 1.0) + { + fprintf(stderr, "invalid sampling rate: %f\n", sample_rate); + exit(1); + } + break; + case 5: +#ifdef WIN32 + fprintf(stderr, "--aggregate-interval is not currently supported on Windows"); + exit(1); +#else + agg_interval = atoi(optarg); + if (agg_interval <= 0) + { + fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval); + exit(1); + } +#endif break; default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); @@ -2197,6 +2739,9 @@ main(int argc, char **argv) } } + /* compute a per thread delay */ + throttle_delay *= nthreads; + if (argc > optind) dbName = argv[optind]; else @@ -2211,7 +2756,7 @@ main(int argc, char **argv) if (is_init_mode) { - init(); + init(is_no_vacuum); exit(0); } @@ -2225,6 +2770,45 @@ main(int argc, char **argv) exit(1); } + /* --sampling-rate may be used only with -l */ + if (sample_rate > 0.0 && !use_log) + { + fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n"); + exit(1); + } + + /* -q may be used only with -i */ + if (use_quiet && !is_init_mode) + { + fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n"); + exit(1); + } + + /* --sampling-rate may must not be used with --aggregate-interval */ + if (sample_rate > 0.0 && agg_interval > 0) + { + fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n"); + exit(1); + } + + if (agg_interval > 0 && (!use_log)) + { + fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n"); + exit(1); + } + + if ((duration > 0) && (agg_interval > duration)) + { + fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration); + exit(1); + } + + if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0)) + { + fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval); + exit(1); + } + /* * is_latencies only works with multiple threads in thread-based * implementations, not fork-based ones, because it supposes that the @@ -2246,10 +2830,12 @@ main(int argc, char **argv) * changed after fork. */ main_pid = (int) getpid(); + progress_nclients = nclients; + progress_nthreads = nthreads; if (nclients > 1) { - state = (CState *) xrealloc(state, sizeof(CState) * nclients); + state = (CState *) pg_realloc(state, sizeof(CState) * nclients); memset(state + 1, 0, sizeof(CState) * (nclients - 1)); /* copy any -D switch values to all clients */ @@ -2329,6 +2915,20 @@ main(int argc, char **argv) } } + /* + * Define a :client_id variable that is unique per connection. But don't + * override an explicit -D switch. + */ + if (getVariable(&state[0], "client_id") == NULL) + { + for (i = 0; i < nclients; i++) + { + snprintf(val, sizeof(val), "%d", i); + if (!putVariable(&state[i], "startup", "client_id", val)) + exit(1); + } + } + if (!is_no_vacuum) { fprintf(stderr, "starting vacuum..."); @@ -2383,7 +2983,7 @@ main(int argc, char **argv) } /* set up thread data structures */ - threads = (TState *) xmalloc(sizeof(TState) * nthreads); + threads = (TState *) pg_malloc(sizeof(TState) * nthreads); for (i = 0; i < nthreads; i++) { TState *thread = &threads[i]; @@ -2401,9 +3001,9 @@ main(int argc, char **argv) int t; thread->exec_elapsed = (instr_time *) - xmalloc(sizeof(instr_time) * num_commands); + pg_malloc(sizeof(instr_time) * num_commands); thread->exec_count = (int *) - xmalloc(sizeof(int) * num_commands); + pg_malloc(sizeof(int) * num_commands); for (t = 0; t < num_commands; t++) { @@ -2450,7 +3050,6 @@ main(int argc, char **argv) } /* wait for threads and accumulate results */ - total_xacts = 0; INSTR_TIME_SET_ZERO(conn_total_time); for (i = 0; i < nthreads; i++) { @@ -2466,17 +3065,32 @@ main(int argc, char **argv) TResult *r = (TResult *) ret; total_xacts += r->xacts; + total_latencies += r->latencies; + total_sqlats += r->sqlats; + throttle_lag += r->throttle_lag; + if (r->throttle_lag_max > throttle_lag_max) + throttle_lag_max = r->throttle_lag_max; INSTR_TIME_ADD(conn_total_time, r->conn_time); free(ret); } } disconnect_all(state, nclients); - /* get end time */ + /* + * XXX We compute results as though every client of every thread started + * and finished at the same time. That model can diverge noticeably from + * reality for a short benchmark run involving relatively many threads. + * The first thread may process notably many transactions before the last + * thread begins. Improving the model alone would bring limited benefit, + * because performance during those periods of partial thread count can + * easily exceed steady state performance. This is one of the many ways + * short runs convey deceptive performance figures. + */ INSTR_TIME_SET_CURRENT(total_time); INSTR_TIME_SUBTRACT(total_time, start_time); printResults(ttype, total_xacts, nclients, threads, nthreads, - total_time, conn_total_time); + total_time, conn_total_time, total_latencies, total_sqlats, + throttle_lag, throttle_lag_max); return 0; } @@ -2494,7 +3108,30 @@ threadRun(void *arg) int remains = nstate; /* number of remaining clients */ int i; - result = xmalloc(sizeof(TResult)); + /* for reporting progress: */ + int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time); + int64 last_report = thread_start; + int64 next_report = last_report + (int64) progress * 1000000; + int64 last_count = 0, + last_lats = 0, + last_sqlats = 0, + last_lags = 0; + + AggVals aggs; + + /* + * Initialize throttling rate target for all of the thread's clients. It + * might be a little more accurate to reset thread->start_time here too. + * The possible drift seems too small relative to typical throttle delay + * times to worry about it. + */ + INSTR_TIME_SET_CURRENT(start); + thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start); + thread->throttle_lag = 0; + thread->throttle_lag_max = 0; + + result = pg_malloc(sizeof(TResult)); + INSTR_TIME_SET_ZERO(result->conn_time); /* open log file if requested */ @@ -2529,6 +3166,8 @@ threadRun(void *arg) INSTR_TIME_SET_CURRENT(result->conn_time); INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time); + agg_vals_init(&aggs, thread->start_time); + /* send start up queries in async manner */ for (i = 0; i < nstate; i++) { @@ -2537,7 +3176,7 @@ threadRun(void *arg) int prev_ecnt = st->ecnt; st->use_file = getrand(thread, 0, num_files - 1); - if (!doCustom(thread, st, &result->conn_time, logfile)) + if (!doCustom(thread, st, &result->conn_time, logfile, &aggs)) remains--; /* I've aborted */ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) @@ -2566,25 +3205,38 @@ threadRun(void *arg) Command **commands = sql_files[st->use_file]; int sock; - if (st->sleeping) + if (st->con == NULL) { - int this_usec; - - if (min_usec == INT64_MAX) + continue; + } + else if (st->sleeping) + { + if (st->throttling && timer_exceeded) { - instr_time now; - - INSTR_TIME_SET_CURRENT(now); - now_usec = INSTR_TIME_GET_MICROSEC(now); + /* interrupt client which has not started a transaction */ + remains--; + st->sleeping = 0; + st->throttling = false; + PQfinish(st->con); + st->con = NULL; + continue; } + else /* just a nap from the script */ + { + int this_usec; - this_usec = st->until - now_usec; - if (min_usec > this_usec) - min_usec = this_usec; - } - else if (st->con == NULL) - { - continue; + if (min_usec == INT64_MAX) + { + instr_time now; + + INSTR_TIME_SET_CURRENT(now); + now_usec = INSTR_TIME_GET_MICROSEC(now); + } + + this_usec = st->until - now_usec; + if (min_usec > this_usec) + min_usec = this_usec; + } } else if (commands[st->state]->type == META_COMMAND) { @@ -2639,7 +3291,7 @@ threadRun(void *arg) if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) || commands[st->state]->type == META_COMMAND)) { - if (!doCustom(thread, st, &result->conn_time, logfile)) + if (!doCustom(thread, st, &result->conn_time, logfile, &aggs)) remains--; /* I've aborted */ } @@ -2651,14 +3303,141 @@ threadRun(void *arg) st->con = NULL; } } + +#ifdef PTHREAD_FORK_EMULATION + /* each process reports its own progression */ + if (progress) + { + instr_time now_time; + int64 now; + + INSTR_TIME_SET_CURRENT(now_time); + now = INSTR_TIME_GET_MICROSEC(now_time); + if (now >= next_report) + { + /* generate and show report */ + int64 count = 0, + lats = 0, + sqlats = 0; + int64 lags = thread->throttle_lag; + int64 run = now - last_report; + double tps, + total_run, + latency, + sqlat, + stdev, + lag; + + for (i = 0; i < nstate; i++) + { + count += state[i].cnt; + lats += state[i].txn_latencies; + sqlats += state[i].txn_sqlats; + } + + total_run = (now - thread_start) / 1000000.0; + tps = 1000000.0 * (count - last_count) / run; + latency = 0.001 * (lats - last_lats) / (count - last_count); + sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count); + stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); + lag = 0.001 * (lags - last_lags) / (count - last_count); + + if (throttle_delay) + fprintf(stderr, + "progress %d: %.1f s, %.1f tps, " + "lat %.3f ms stddev %.3f, lag %.3f ms\n", + thread->tid, total_run, tps, latency, stdev, lag); + else + fprintf(stderr, + "progress %d: %.1f s, %.1f tps, " + "lat %.3f ms stddev %.3f\n", + thread->tid, total_run, tps, latency, stdev); + + last_count = count; + last_lats = lats; + last_sqlats = sqlats; + last_lags = lags; + last_report = now; + next_report += (int64) progress *1000000; + } + } +#else + /* progress report by thread 0 for all threads */ + if (progress && thread->tid == 0) + { + instr_time now_time; + int64 now; + + INSTR_TIME_SET_CURRENT(now_time); + now = INSTR_TIME_GET_MICROSEC(now_time); + if (now >= next_report) + { + /* generate and show report */ + int64 count = 0, + lats = 0, + sqlats = 0, + lags = 0; + int64 run = now - last_report; + double tps, + total_run, + latency, + sqlat, + lag, + stdev; + + for (i = 0; i < progress_nclients; i++) + { + count += state[i].cnt; + lats += state[i].txn_latencies; + sqlats += state[i].txn_sqlats; + } + + for (i = 0; i < progress_nthreads; i++) + lags += thread[i].throttle_lag; + + total_run = (now - thread_start) / 1000000.0; + tps = 1000000.0 * (count - last_count) / run; + latency = 0.001 * (lats - last_lats) / (count - last_count); + sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count); + stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); + lag = 0.001 * (lags - last_lags) / (count - last_count); + + if (throttle_delay) + fprintf(stderr, + "progress: %.1f s, %.1f tps, " + "lat %.3f ms stddev %.3f, lag %.3f ms\n", + total_run, tps, latency, stdev, lag); + else + fprintf(stderr, + "progress: %.1f s, %.1f tps, " + "lat %.3f ms stddev %.3f\n", + total_run, tps, latency, stdev); + + last_count = count; + last_lats = lats; + last_sqlats = sqlats; + last_lags = lags; + last_report = now; + next_report += (int64) progress *1000000; + } + } +#endif /* PTHREAD_FORK_EMULATION */ } done: INSTR_TIME_SET_CURRENT(start); disconnect_all(state, nstate); result->xacts = 0; + result->latencies = 0; + result->sqlats = 0; for (i = 0; i < nstate; i++) + { result->xacts += state[i].cnt; + result->latencies += state[i].txn_latencies; + result->sqlats += state[i].txn_sqlats; + } + result->throttle_lag = thread->throttle_lag; + result->throttle_lag_max = thread->throttle_lag_max; INSTR_TIME_SET_CURRENT(end); INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); if (logfile) @@ -2666,7 +3445,6 @@ done: return result; } - /* * Support for duration option: set timer_exceeded after so many seconds. */ @@ -2706,8 +3484,9 @@ pthread_create(pthread_t *thread, { fork_pthread *th; void *ret; + int rc; - th = (fork_pthread *) xmalloc(sizeof(fork_pthread)); + th = (fork_pthread *) pg_malloc(sizeof(fork_pthread)); if (pipe(th->pipes) < 0) { free(th); @@ -2735,7 +3514,8 @@ pthread_create(pthread_t *thread, setalarm(duration); ret = start_routine(arg); - write(th->pipes[1], ret, sizeof(TResult)); + rc = write(th->pipes[1], ret, sizeof(TResult)); + (void) rc; close(th->pipes[1]); free(th); exit(0); @@ -2755,7 +3535,7 @@ pthread_join(pthread_t th, void **thread_return) if (thread_return != NULL) { /* assume result is TResult */ - *thread_return = xmalloc(sizeof(TResult)); + *thread_return = pg_malloc(sizeof(TResult)); if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult)) { free(*thread_return); @@ -2823,7 +3603,7 @@ pthread_create(pthread_t *thread, int save_errno; win32_pthread *th; - th = (win32_pthread *) xmalloc(sizeof(win32_pthread)); + th = (win32_pthread *) pg_malloc(sizeof(win32_pthread)); th->routine = start_routine; th->arg = arg; th->result = NULL; |