summaryrefslogtreecommitdiff
path: root/contrib/pgbench/pgbench.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/pgbench/pgbench.c')
-rw-r--r--contrib/pgbench/pgbench.c1282
1 files changed, 1031 insertions, 251 deletions
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 24b0c7e105..534fd72150 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -5,7 +5,7 @@
* Originally written by Tatsuo Ishii and enhanced by many contributors.
*
* contrib/pgbench/pgbench.c
- * Copyright (c) 2000-2012, PostgreSQL Global Development Group
+ * Copyright (c) 2000-2014, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
@@ -35,16 +35,12 @@
#include "getopt_long.h"
#include "libpq-fe.h"
-#include "libpq/pqsignal.h"
#include "portability/instr_time.h"
#include <ctype.h>
-
-#ifndef WIN32
+#include <math.h>
+#include <signal.h>
#include <sys/time.h>
-#include <unistd.h>
-#endif /* ! WIN32 */
-
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
@@ -73,7 +69,7 @@ static int pthread_join(pthread_t th, void **thread_return);
#include <pthread.h>
#else
/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
-
+#define PTHREAD_FORK_EMULATION
#include <sys/wait.h>
#define pthread_t pg_pthread_t
@@ -88,9 +84,6 @@ static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start
static int pthread_join(pthread_t th, void **thread_return);
#endif
-extern char *optarg;
-extern int optind;
-
/********************************************************************
* some configurable parameters */
@@ -102,6 +95,7 @@ extern int optind;
#define MAXCLIENTS 1024
#endif
+#define LOG_STEP_SECONDS 5 /* seconds between log messages */
#define DEFAULT_NXACTS 10 /* default nxacts */
int nxacts = 0; /* number of transactions per client */
@@ -120,11 +114,27 @@ int scale = 1;
int fillfactor = 100;
/*
+ * create foreign key constraints on the tables?
+ */
+int foreign_keys = 0;
+
+/*
* use unlogged tables?
*/
int unlogged_tables = 0;
/*
+ * log sampling rate (1.0 = log everything, 0.0 = option not given)
+ */
+double sample_rate = 0.0;
+
+/*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec. 0 is the default and means no throttling.
+ */
+int64 throttle_delay = 0;
+
+/*
* tablespace selection
*/
char *tablespace = NULL;
@@ -142,17 +152,33 @@ char *index_tablespace = NULL;
#ifdef PGXC
bool use_branch = false; /* use branch id in DDL and DML */
#endif
+/*
+ * The scale factor at/beyond which 32bit integers are incapable of storing
+ * 64bit values.
+ *
+ * Although the actual threshold is 21474, we use 20000 because it is easier to
+ * document and remember, and isn't that far away from the real threshold.
+ */
+#define SCALE_32BIT_THRESHOLD 20000
+
bool use_log; /* log transaction latencies to a file */
+bool use_quiet; /* quiet logging onto stderr */
+int agg_interval; /* log aggregates instead of individual
+ * transactions */
+int progress = 0; /* thread progress report every this seconds */
+int progress_nclients = 0; /* number of clients for progress
+ * report */
+int progress_nthreads = 0; /* number of threads for progress
+ * report */
bool is_connect; /* establish connection for each transaction */
bool is_latencies; /* report per-command latencies */
int main_pid; /* main process id used in log filename */
char *pghost = "";
char *pgport = "";
-char *pgoptions = NULL;
-char *pgtty = NULL;
char *login = NULL;
char *dbName;
+const char *progname;
volatile bool timer_exceeded = false; /* flag from signal handler */
@@ -180,11 +206,15 @@ typedef struct
int listen; /* 0 indicates that an async query has been
* sent */
int sleeping; /* 1 indicates that the client is napping */
+ bool throttling; /* whether nap is for throttling */
int64 until; /* napping until (usec) */
Variable *variables; /* array of variable definitions */
int nvariables;
instr_time txn_begin; /* used for measuring transaction latencies */
instr_time stmt_begin; /* used for measuring statement latencies */
+ int64 txn_latencies; /* cumulated latencies */
+ int64 txn_sqlats; /* cumulated square latencies */
+ bool is_throttled; /* whether transaction throttling is done */
int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES];
} CState;
@@ -202,6 +232,9 @@ typedef struct
instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
int *exec_count; /* number of cmd executions (per Command) */
unsigned short random_state[3]; /* separate randomness for each thread */
+ int64 throttle_trigger; /* previous/next throttling (us) */
+ int64 throttle_lag; /* total transaction lag behind throttling */
+ int64 throttle_lag_max; /* max transaction lag */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -209,7 +242,11 @@ typedef struct
typedef struct
{
instr_time conn_time;
- int xacts;
+ int64 xacts;
+ int64 latencies;
+ int64 sqlats;
+ int64 throttle_lag;
+ int64 throttle_lag_max;
} TResult;
/*
@@ -239,6 +276,19 @@ typedef struct
char *argv[MAX_ARGS]; /* command word list */
} Command;
+typedef struct
+{
+
+ long start_time; /* when does the interval start */
+ int cnt; /* number of transactions */
+ double min_duration; /* min/max durations */
+ double max_duration;
+ double sum; /* sum(duration), sum(duration^2) - for
+ * estimates */
+ double sum2;
+
+} AggVals;
+
static Command **sql_files[MAX_FILES]; /* SQL script files */
static int num_files; /* number of script files */
static int num_commands = 0; /* total number of Command structs */
@@ -326,108 +376,134 @@ static char *select_only = {
static void setalarm(int seconds);
static void *threadRun(void *arg);
-
-/*
- * routines to check mem allocations and fail noisily.
- */
-static void *
-xmalloc(size_t size)
-{
- void *result;
-
- result = malloc(size);
- if (!result)
- {
- fprintf(stderr, "out of memory\n");
- exit(1);
- }
- return result;
-}
-
-static void *
-xrealloc(void *ptr, size_t size)
-{
- void *result;
-
- result = realloc(ptr, size);
- if (!result)
- {
- fprintf(stderr, "out of memory\n");
- exit(1);
- }
- return result;
-}
-
-static char *
-xstrdup(const char *s)
-{
- char *result;
-
- result = strdup(s);
- if (!result)
- {
- fprintf(stderr, "out of memory\n");
- exit(1);
- }
- return result;
-}
-
-
static void
-usage(const char *progname)
+usage(void)
{
printf("%s is a benchmarking tool for PostgreSQL.\n\n"
"Usage:\n"
" %s [OPTION]... [DBNAME]\n"
"\nInitialization options:\n"
- " -i invokes initialization mode\n"
- " -F NUM fill factor\n"
+ " -i, --initialize invokes initialization mode\n"
+ " -F, --fillfactor=NUM set fill factor\n"
#ifdef PGXC
" -k distribute by primary key branch id - bid\n"
#endif
- " -s NUM scaling factor\n"
+ " -n, --no-vacuum do not run VACUUM after initialization\n"
+ " -q, --quiet quiet logging (one message each 5 seconds)\n"
+ " -s, --scale=NUM scaling factor\n"
+ " --foreign-keys create foreign key constraints between tables\n"
" --index-tablespace=TABLESPACE\n"
- " create indexes in the specified tablespace\n"
- " --tablespace=TABLESPACE\n"
- " create tables in the specified tablespace\n"
- " --unlogged-tables\n"
- " create tables as unlogged tables\n"
+ " create indexes in the specified tablespace\n"
+ " --tablespace=TABLESPACE create tables in the specified tablespace\n"
+ " --unlogged-tables create tables as unlogged tables\n"
"\nBenchmarking options:\n"
- " -c NUM number of concurrent database clients (default: 1)\n"
- " -C establish new connection for each transaction\n"
- " -D VARNAME=VALUE\n"
- " define variable for use by custom script\n"
- " -f FILENAME read transaction script from FILENAME\n"
+ " -c, --client=NUM number of concurrent database clients (default: 1)\n"
+ " -C, --connect establish new connection for each transaction\n"
+ " -D, --define=VARNAME=VALUE\n"
+ " define variable for use by custom script\n"
+ " -f, --file=FILENAME read transaction script from FILENAME\n"
#ifdef PGXC
" -k query with default key and additional key branch id (bid)\n"
#endif
- " -j NUM number of threads (default: 1)\n"
- " -l write transaction times to log file\n"
- " -M simple|extended|prepared\n"
- " protocol for submitting queries to server (default: simple)\n"
- " -n do not run VACUUM before tests\n"
- " -N do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
- " -r report average latency per command\n"
- " -s NUM report this scale factor in output\n"
- " -S perform SELECT-only transactions\n"
- " -t NUM number of transactions each client runs (default: 10)\n"
- " -T NUM duration of benchmark test in seconds\n"
- " -v vacuum all four standard tables before tests\n"
+ " -j, --jobs=NUM number of threads (default: 1)\n"
+ " -l, --log write transaction times to log file\n"
+ " -M, --protocol=simple|extended|prepared\n"
+ " protocol for submitting queries (default: simple)\n"
+ " -n, --no-vacuum do not run VACUUM before tests\n"
+ " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
+ " -P, --progress=NUM show thread progress report every NUM seconds\n"
+ " -r, --report-latencies report average latency per command\n"
+ " -R, --rate=NUM target rate in transactions per second\n"
+ " -s, --scale=NUM report this scale factor in output\n"
+ " -S, --select-only perform SELECT-only transactions\n"
+ " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
+ " -T, --time=NUM duration of benchmark test in seconds\n"
+ " -v, --vacuum-all vacuum all four standard tables before tests\n"
+ " --aggregate-interval=NUM aggregate data over NUM seconds\n"
+ " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n"
"\nCommon options:\n"
- " -d print debugging output\n"
- " -h HOSTNAME database server host or socket directory\n"
- " -p PORT database server port number\n"
- " -U USERNAME connect as specified database user\n"
- " --help show this help, then exit\n"
- " --version output version information, then exit\n"
+ " -d, --debug print debugging output\n"
+ " -h, --host=HOSTNAME database server host or socket directory\n"
+ " -p, --port=PORT database server port number\n"
+ " -U, --username=USERNAME connect as specified database user\n"
+ " -V, --version output version information, then exit\n"
+ " -?, --help show this help, then exit\n"
"\n"
"Report bugs to <pgsql-bugs@postgresql.org>.\n",
progname, progname);
}
+/*
+ * strtoint64 -- convert a string to 64-bit integer
+ *
+ * This function is a modified version of scanint8() from
+ * src/backend/utils/adt/int8.c.
+ */
+static int64
+strtoint64(const char *str)
+{
+ const char *ptr = str;
+ int64 result = 0;
+ int sign = 1;
+
+ /*
+ * Do our own scan, rather than relying on sscanf which might be broken
+ * for long long.
+ */
+
+ /* skip leading spaces */
+ while (*ptr && isspace((unsigned char) *ptr))
+ ptr++;
+
+ /* handle sign */
+ if (*ptr == '-')
+ {
+ ptr++;
+
+ /*
+ * Do an explicit check for INT64_MIN. Ugly though this is, it's
+ * cleaner than trying to get the loop below to handle it portably.
+ */
+ if (strncmp(ptr, "9223372036854775808", 19) == 0)
+ {
+ result = -INT64CONST(0x7fffffffffffffff) - 1;
+ ptr += 19;
+ goto gotdigits;
+ }
+ sign = -1;
+ }
+ else if (*ptr == '+')
+ ptr++;
+
+ /* require at least one digit */
+ if (!isdigit((unsigned char) *ptr))
+ fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
+
+ /* process digits */
+ while (*ptr && isdigit((unsigned char) *ptr))
+ {
+ int64 tmp = result * 10 + (*ptr++ - '0');
+
+ if ((tmp / 10) != result) /* overflow? */
+ fprintf(stderr, "value \"%s\" is out of range for type bigint\n", str);
+ result = tmp;
+ }
+
+gotdigits:
+
+ /* allow trailing whitespace, but not other trailing chars */
+ while (*ptr != '\0' && isspace((unsigned char) *ptr))
+ ptr++;
+
+ if (*ptr != '\0')
+ fprintf(stderr, "invalid input syntax for integer: \"%s\"\n", str);
+
+ return ((sign < 0) ? -result : result);
+}
+
/* random number generator: uniform distribution from min to max inclusive */
-static int
-getrand(TState *thread, int min, int max)
+static int64
+getrand(TState *thread, int64 min, int64 max)
{
/*
* Odd coding is so that min and max have approximately the same chance of
@@ -438,7 +514,7 @@ getrand(TState *thread, int min, int max)
* protected by a mutex, and therefore a bottleneck on machines with many
* CPUs.
*/
- return min + (int) ((max - min + 1) * pg_erand48(thread->random_state));
+ return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
}
/* call PQexec() and exit() on failure */
@@ -470,10 +546,30 @@ doConnect(void)
*/
do
{
+#define PARAMS_ARRAY_SIZE 7
+
+ const char *keywords[PARAMS_ARRAY_SIZE];
+ const char *values[PARAMS_ARRAY_SIZE];
+
+ keywords[0] = "host";
+ values[0] = pghost;
+ keywords[1] = "port";
+ values[1] = pgport;
+ keywords[2] = "user";
+ values[2] = login;
+ keywords[3] = "password";
+ values[3] = password;
+ keywords[4] = "dbname";
+ values[4] = dbName;
+ keywords[5] = "fallback_application_name";
+ values[5] = progname;
+ keywords[6] = NULL;
+ values[6] = NULL;
+
new_pass = false;
- conn = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName,
- login, password);
+ conn = PQconnectdbParams(keywords, values, true);
+
if (!conn)
{
fprintf(stderr, "Connection to database \"%s\" failed\n",
@@ -593,17 +689,17 @@ putVariable(CState *st, const char *context, char *name, char *value)
}
if (st->variables)
- newvars = (Variable *) xrealloc(st->variables,
+ newvars = (Variable *) pg_realloc(st->variables,
(st->nvariables + 1) * sizeof(Variable));
else
- newvars = (Variable *) xmalloc(sizeof(Variable));
+ newvars = (Variable *) pg_malloc(sizeof(Variable));
st->variables = newvars;
var = &newvars[st->nvariables];
- var->name = xstrdup(name);
- var->value = xstrdup(value);
+ var->name = pg_strdup(name);
+ var->value = pg_strdup(value);
st->nvariables++;
@@ -615,7 +711,7 @@ putVariable(CState *st, const char *context, char *name, char *value)
char *val;
/* dup then free, in case value is pointing at this variable */
- val = xstrdup(value);
+ val = pg_strdup(value);
free(var->value);
var->value = val;
@@ -637,7 +733,7 @@ parseVariable(const char *sql, int *eaten)
if (i == 1)
return NULL;
- name = xmalloc(i);
+ name = pg_malloc(i);
memcpy(name, &sql[1], i - 1);
name[i - 1] = '\0';
@@ -654,7 +750,7 @@ replaceVariable(char **sql, char *param, int len, char *value)
{
size_t offset = param - *sql;
- *sql = xrealloc(*sql, strlen(*sql) - len + valueln + 1);
+ *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
param = *sql + offset;
}
@@ -836,23 +932,90 @@ clientDone(CState *st, bool ok)
return false; /* always false */
}
+static
+void
+agg_vals_init(AggVals *aggs, instr_time start)
+{
+ /* basic counters */
+ aggs->cnt = 0; /* number of transactions */
+ aggs->sum = 0; /* SUM(duration) */
+ aggs->sum2 = 0; /* SUM(duration*duration) */
+
+ /* min and max transaction duration */
+ aggs->min_duration = 0;
+ aggs->max_duration = 0;
+
+ /* start of the current interval */
+ aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
+}
+
/* return false iff client should be disconnected */
static bool
-doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile)
+doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
{
PGresult *res;
Command **commands;
+ bool trans_needs_throttle = false;
top:
commands = sql_files[st->use_file];
+ /*
+ * Handle throttling once per transaction by sleeping. It is simpler to
+ * do this here rather than at the end, because so much complicated logic
+ * happens below when statements finish.
+ */
+ if (throttle_delay && !st->is_throttled)
+ {
+ /*
+ * Use inverse transform sampling to randomly generate a delay, such
+ * that the series of delays will approximate a Poisson distribution
+ * centered on the throttle_delay time.
+ *
+ * 10000 implies a 9.2 (-log(1/10000)) to 0.0 (log 1) delay
+ * multiplier, and results in a 0.055 % target underestimation bias:
+ *
+ * SELECT 1.0/AVG(-LN(i/10000.0)) FROM generate_series(1,10000) AS i;
+ * = 1.000552717032611116335474
+ *
+ * If transactions are too slow or a given wait is shorter than a
+ * transaction, the next transaction will start right away.
+ */
+ int64 wait = (int64) (throttle_delay *
+ 1.00055271703 * -log(getrand(thread, 1, 10000) / 10000.0));
+
+ thread->throttle_trigger += wait;
+
+ st->until = thread->throttle_trigger;
+ st->sleeping = 1;
+ st->throttling = true;
+ st->is_throttled = true;
+ if (debug)
+ fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
+ st->id, wait);
+ }
+
if (st->sleeping)
{ /* are we sleeping? */
instr_time now;
+ int64 now_us;
INSTR_TIME_SET_CURRENT(now);
- if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+ now_us = INSTR_TIME_GET_MICROSEC(now);
+ if (st->until <= now_us)
+ {
st->sleeping = 0; /* Done sleeping, go ahead with next command */
+ if (st->throttling)
+ {
+ /* Measure lag of throttled transaction relative to target */
+ int64 lag = now_us - st->until;
+
+ thread->throttle_lag += lag;
+ if (lag > thread->throttle_lag_max)
+ thread->throttle_lag_max = lag;
+ st->throttling = false;
+ }
+ }
else
return true; /* Still sleeping, nothing to do here */
}
@@ -887,6 +1050,27 @@ top:
thread->exec_count[cnum]++;
}
+ /* transaction finished: record latency under progress or throttling */
+ if ((progress || throttle_delay) && commands[st->state + 1] == NULL)
+ {
+ instr_time diff;
+ int64 latency;
+
+ INSTR_TIME_SET_CURRENT(diff);
+ INSTR_TIME_SUBTRACT(diff, st->txn_begin);
+ latency = INSTR_TIME_GET_MICROSEC(diff);
+ st->txn_latencies += latency;
+
+ /*
+ * XXX In a long benchmark run of high-latency transactions, this
+ * int64 addition eventually overflows. For example, 100 threads
+ * running 10s transactions will overflow it in 2.56 hours. With
+ * a more-typical OLTP workload of .1s transactions, overflow
+ * would take 256 hours.
+ */
+ st->txn_sqlats += latency * latency;
+ }
+
/*
* if transaction finished, record the time it took in the log
*/
@@ -896,21 +1080,105 @@ top:
instr_time diff;
double usec;
- INSTR_TIME_SET_CURRENT(now);
- diff = now;
- INSTR_TIME_SUBTRACT(diff, st->txn_begin);
- usec = (double) INSTR_TIME_GET_MICROSEC(diff);
+ /*
+ * write the log entry if this row belongs to the random sample,
+ * or no sampling rate was given which means log everything.
+ */
+ if (sample_rate == 0.0 ||
+ pg_erand48(thread->random_state) <= sample_rate)
+ {
+ INSTR_TIME_SET_CURRENT(now);
+ diff = now;
+ INSTR_TIME_SUBTRACT(diff, st->txn_begin);
+ usec = (double) INSTR_TIME_GET_MICROSEC(diff);
+
+ /* should we aggregate the results or not? */
+ if (agg_interval > 0)
+ {
+ /*
+ * are we still in the same interval? if yes, accumulate
+ * the values (print them otherwise)
+ */
+ if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(now))
+ {
+ agg->cnt += 1;
+ agg->sum += usec;
+ agg->sum2 += usec * usec;
+
+ /* first in this aggregation interval */
+ if ((agg->cnt == 1) || (usec < agg->min_duration))
+ agg->min_duration = usec;
+ if ((agg->cnt == 1) || (usec > agg->max_duration))
+ agg->max_duration = usec;
+ }
+ else
+ {
+ /*
+ * Loop until we reach the interval of the current
+ * transaction (and print all the empty intervals in
+ * between).
+ */
+ while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(now))
+ {
+ /*
+ * This is a non-Windows branch (thanks to the
+ * ifdef in usage), so we don't need to handle
+ * this in a special way (see below).
+ */
+ fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f\n",
+ agg->start_time,
+ agg->cnt,
+ agg->sum,
+ agg->sum2,
+ agg->min_duration,
+ agg->max_duration);
+
+ /* move to the next inteval */
+ agg->start_time = agg->start_time + agg_interval;
+
+ /* reset for "no transaction" intervals */
+ agg->cnt = 0;
+ agg->min_duration = 0;
+ agg->max_duration = 0;
+ agg->sum = 0;
+ agg->sum2 = 0;
+ }
+
+ /*
+ * and now update the reset values (include the
+ * current)
+ */
+ agg->cnt = 1;
+ agg->min_duration = usec;
+ agg->max_duration = usec;
+ agg->sum = usec;
+ agg->sum2 = usec * usec;
+ }
+ }
+ else
+ {
+ /* no, print raw transactions */
#ifndef WIN32
- /* This is more than we really ought to know about instr_time */
- fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
- st->id, st->cnt, usec, st->use_file,
- (long) now.tv_sec, (long) now.tv_usec);
+
+ /*
+ * This is more than we really ought to know about
+ * instr_time
+ */
+ fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
+ st->id, st->cnt, usec, st->use_file,
+ (long) now.tv_sec, (long) now.tv_usec);
#else
- /* On Windows, instr_time doesn't provide a timestamp anyway */
- fprintf(logfile, "%d %d %.0f %d 0 0\n",
- st->id, st->cnt, usec, st->use_file);
+
+ /*
+ * On Windows, instr_time doesn't provide a timestamp
+ * anyway
+ */
+ fprintf(logfile, "%d %d %.0f %d 0 0\n",
+ st->id, st->cnt, usec, st->use_file);
#endif
+ }
+ }
}
if (commands[st->state]->type == SQL_COMMAND)
@@ -953,8 +1221,19 @@ top:
if (commands[st->state] == NULL)
{
st->state = 0;
- st->use_file = getrand(thread, 0, num_files - 1);
+ st->use_file = (int) getrand(thread, 0, num_files - 1);
commands = sql_files[st->use_file];
+ st->is_throttled = false;
+
+ /*
+ * No transaction is underway anymore, which means there is
+ * nothing to listen to right now. When throttling rate limits
+ * are active, a sleep will happen next, as the next transaction
+ * starts. And then in any case the next SQL command will set
+ * listen back to 1.
+ */
+ st->listen = 0;
+ trans_needs_throttle = (throttle_delay > 0);
}
}
@@ -973,8 +1252,19 @@ top:
INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
}
- /* Record transaction start time if logging is enabled */
- if (logfile && st->state == 0)
+ /*
+ * This ensures that a throttling delay is inserted before proceeding with
+ * sql commands, after the first transaction. The first transaction
+ * throttling is performed when first entering doCustom.
+ */
+ if (trans_needs_throttle)
+ {
+ trans_needs_throttle = false;
+ goto top;
+ }
+
+ /* Record transaction start time under logging, progress or throttling */
+ if ((logfile || progress || throttle_delay) && st->state == 0)
INSTR_TIME_SET_CURRENT(st->txn_begin);
/* Record statement start time if per-command latencies are requested */
@@ -990,7 +1280,7 @@ top:
{
char *sql;
- sql = xstrdup(command->argv[0]);
+ sql = pg_strdup(command->argv[0]);
sql = assignVariables(st, sql);
if (debug)
@@ -1073,7 +1363,7 @@ top:
if (pg_strcasecmp(argv[0], "setrandom") == 0)
{
char *var;
- int min,
+ int64 min,
max;
char res[64];
@@ -1085,10 +1375,10 @@ top:
st->ecnt++;
return true;
}
- min = atoi(var);
+ min = strtoint64(var);
}
else
- min = atoi(argv[2]);
+ min = strtoint64(argv[2]);
#ifdef NOT_USED
if (min < 0)
@@ -1107,10 +1397,10 @@ top:
st->ecnt++;
return true;
}
- max = atoi(var);
+ max = strtoint64(var);
}
else
- max = atoi(argv[3]);
+ max = strtoint64(argv[3]);
if (max < min)
{
@@ -1120,11 +1410,11 @@ top:
}
/*
- * getrand() neeeds to be able to subtract max from min and add
- * one the result without overflowing. Since we know max > min,
- * we can detect overflow just by checking for a negative result.
- * But we must check both that the subtraction doesn't overflow,
- * and that adding one to the result doesn't overflow either.
+ * getrand() needs to be able to subtract max from min and add one
+ * to the result without overflowing. Since we know max > min, we
+ * can detect overflow just by checking for a negative result. But
+ * we must check both that the subtraction doesn't overflow, and
+ * that adding one to the result doesn't overflow either.
*/
if (max - min < 0 || (max - min) + 1 < 0)
{
@@ -1134,9 +1424,9 @@ top:
}
#ifdef DEBUG
- printf("min: %d max: %d random: %d\n", min, max, getrand(thread, min, max));
+ printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max));
#endif
- snprintf(res, sizeof(res), "%d", getrand(thread, min, max));
+ snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max));
if (!putVariable(st, argv[0], argv[1], res))
{
@@ -1149,7 +1439,7 @@ top:
else if (pg_strcasecmp(argv[0], "set") == 0)
{
char *var;
- int ope1,
+ int64 ope1,
ope2;
char res[64];
@@ -1161,13 +1451,13 @@ top:
st->ecnt++;
return true;
}
- ope1 = atoi(var);
+ ope1 = strtoint64(var);
}
else
- ope1 = atoi(argv[2]);
+ ope1 = strtoint64(argv[2]);
if (argc < 5)
- snprintf(res, sizeof(res), "%d", ope1);
+ snprintf(res, sizeof(res), INT64_FORMAT, ope1);
else
{
if (*argv[4] == ':')
@@ -1178,17 +1468,17 @@ top:
st->ecnt++;
return true;
}
- ope2 = atoi(var);
+ ope2 = strtoint64(var);
}
else
- ope2 = atoi(argv[4]);
+ ope2 = strtoint64(argv[4]);
if (strcmp(argv[3], "+") == 0)
- snprintf(res, sizeof(res), "%d", ope1 + ope2);
+ snprintf(res, sizeof(res), INT64_FORMAT, ope1 + ope2);
else if (strcmp(argv[3], "-") == 0)
- snprintf(res, sizeof(res), "%d", ope1 - ope2);
+ snprintf(res, sizeof(res), INT64_FORMAT, ope1 - ope2);
else if (strcmp(argv[3], "*") == 0)
- snprintf(res, sizeof(res), "%d", ope1 * ope2);
+ snprintf(res, sizeof(res), INT64_FORMAT, ope1 * ope2);
else if (strcmp(argv[3], "/") == 0)
{
if (ope2 == 0)
@@ -1197,7 +1487,7 @@ top:
st->ecnt++;
return true;
}
- snprintf(res, sizeof(res), "%d", ope1 / ope2);
+ snprintf(res, sizeof(res), INT64_FORMAT, ope1 / ope2);
}
else
{
@@ -1302,31 +1592,44 @@ disconnect_all(CState *state, int length)
/* create tables and setup data */
static void
-init(void)
+init(bool is_no_vacuum)
{
+/*
+ * The scale factor at/beyond which 32-bit integers are insufficient for
+ * storing TPC-B account IDs.
+ *
+ * Although the actual threshold is 21474, we use 20000 because it is easier to
+ * document and remember, and isn't that far away from the real threshold.
+ */
+#define SCALE_32BIT_THRESHOLD 20000
+
/*
* Note: TPC-B requires at least 100 bytes per row, and the "filler"
* fields in these table declarations were intended to comply with that.
- * But because they default to NULLs, they don't actually take any space.
- * We could fix that by giving them non-null default values. However, that
+ * The pgbench_accounts table complies with that because the "filler"
+ * column is set to blank-padded empty string. But for all other tables
+ * the columns default to NULL and so don't actually take any space. We
+ * could fix that by giving them non-null default values. However, that
* would completely break comparability of pgbench results with prior
- * versions. Since pgbench has never pretended to be fully TPC-B
- * compliant anyway, we stick with the historical behavior.
+ * versions. Since pgbench has never pretended to be fully TPC-B compliant
+ * anyway, we stick with the historical behavior.
*/
struct ddlinfo
{
- char *table;
- char *cols;
+ const char *table; /* table name */
+ const char *smcols; /* column decls if accountIDs are 32 bits */
+ const char *bigcols; /* column decls if accountIDs are 64 bits */
int declare_fillfactor;
#ifdef PGXC
char *distribute_by;
#endif
};
- struct ddlinfo DDLs[] = {
+ static const struct ddlinfo DDLs[] = {
{
- "pgbench_branches",
- "bid int not null,bbalance int,filler char(88)",
- 1
+ "pgbench_history",
+ "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
+ "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
+ 0
#ifdef PGXC
, "distribute by hash (bid)"
#endif
@@ -1334,6 +1637,7 @@ init(void)
{
"pgbench_tellers",
"tid int not null,bid int,tbalance int,filler char(84)",
+ "tid int not null,bid int,tbalance int,filler char(84)",
1
#ifdef PGXC
, "distribute by hash (bid)"
@@ -1341,27 +1645,35 @@ init(void)
},
{
"pgbench_accounts",
- "aid int not null,bid int,abalance int,filler char(84)",
+ "aid int not null,bid int,abalance int,filler char(84)",
+ "aid bigint not null,bid int,abalance int,filler char(84)",
1
#ifdef PGXC
, "distribute by hash (bid)"
#endif
},
{
- "pgbench_history",
- "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
- 0
+ "pgbench_branches",
+ "bid int not null,bbalance int,filler char(88)",
+ "bid int not null,bbalance int,filler char(88)",
+ 1
#ifdef PGXC
, "distribute by hash (bid)"
#endif
}
};
-
- static char *DDLAFTERs[] = {
+ static const char *const DDLINDEXes[] = {
"alter table pgbench_branches add primary key (bid)",
"alter table pgbench_tellers add primary key (tid)",
"alter table pgbench_accounts add primary key (aid)"
};
+ static const char *const DDLKEYs[] = {
+ "alter table pgbench_tellers add foreign key (bid) references pgbench_branches",
+ "alter table pgbench_accounts add foreign key (bid) references pgbench_branches",
+ "alter table pgbench_history add foreign key (bid) references pgbench_branches",
+ "alter table pgbench_history add foreign key (tid) references pgbench_tellers",
+ "alter table pgbench_history add foreign key (aid) references pgbench_accounts"
+ };
#ifdef PGXC
static char *DDLAFTERs_bid[] = {
@@ -1375,6 +1687,14 @@ init(void)
PGresult *res;
char sql[256];
int i;
+ int64 k;
+
+ /* used to track elapsed time and estimate of the remaining time */
+ instr_time start,
+ diff;
+ double elapsed_sec,
+ remaining_sec;
+ int log_interval = 1;
if ((con = doConnect()) == NULL)
exit(1);
@@ -1383,16 +1703,17 @@ init(void)
{
char opts[256];
char buffer[256];
- struct ddlinfo *ddl = &DDLs[i];
+ const struct ddlinfo *ddl = &DDLs[i];
+ const char *cols;
/* Remove old table, if it exists. */
- snprintf(buffer, 256, "drop table if exists %s", ddl->table);
+ snprintf(buffer, sizeof(buffer), "drop table if exists %s", ddl->table);
executeStatement(con, buffer);
/* Construct new create table statement. */
opts[0] = '\0';
if (ddl->declare_fillfactor)
- snprintf(opts + strlen(opts), 256 - strlen(opts),
+ snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
" with (fillfactor=%d)", fillfactor);
if (tablespace != NULL)
{
@@ -1400,21 +1721,24 @@ init(void)
escape_tablespace = PQescapeIdentifier(con, tablespace,
strlen(tablespace));
- snprintf(opts + strlen(opts), 256 - strlen(opts),
+ snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),
" tablespace %s", escape_tablespace);
PQfreemem(escape_tablespace);
}
+
+ cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;
+
#ifdef PGXC
/* Add distribution columns if necessary */
if (use_branch)
- snprintf(buffer, 256, "create%s table %s(%s)%s %s",
+ snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s %s",
unlogged_tables ? " unlogged" : "",
ddl->table, ddl->cols, opts, ddl->distribute_by);
else
#endif
- snprintf(buffer, 256, "create%s table %s(%s)%s",
+ snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",
unlogged_tables ? " unlogged" : "",
- ddl->table, ddl->cols, opts);
+ ddl->table, cols, opts);
executeStatement(con, buffer);
}
@@ -1423,13 +1747,18 @@ init(void)
for (i = 0; i < nbranches * scale; i++)
{
- snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1);
+ /* "filler" column defaults to NULL */
+ snprintf(sql, sizeof(sql),
+ "insert into pgbench_branches(bid,bbalance) values(%d,0)",
+ i + 1);
executeStatement(con, sql);
}
for (i = 0; i < ntellers * scale; i++)
{
- snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
+ /* "filler" column defaults to NULL */
+ snprintf(sql, sizeof(sql),
+ "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
i + 1, i / ntellers + 1);
executeStatement(con, sql);
}
@@ -1452,19 +1781,60 @@ init(void)
}
PQclear(res);
- for (i = 0; i < naccounts * scale; i++)
+ INSTR_TIME_SET_CURRENT(start);
+
+ for (k = 0; k < (int64) naccounts * scale; k++)
{
- int j = i + 1;
+ int64 j = k + 1;
- snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0);
+ /* "filler" column defaults to blank padded empty string */
+ snprintf(sql, sizeof(sql),
+ INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",
+ j, k / naccounts + 1, 0);
if (PQputline(con, sql))
{
fprintf(stderr, "PQputline failed\n");
exit(1);
}
- if (j % 10000 == 0)
- fprintf(stderr, "%d tuples done.\n", j);
+ /*
+ * If we want to stick with the original logging, print a message each
+ * 100k inserted rows.
+ */
+ if ((!use_quiet) && (j % 100000 == 0))
+ {
+ INSTR_TIME_SET_CURRENT(diff);
+ INSTR_TIME_SUBTRACT(diff, start);
+
+ elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
+ remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+
+ fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
+ j, (int64) naccounts * scale,
+ (int) (((int64) j * 100) / (naccounts * (int64) scale)),
+ elapsed_sec, remaining_sec);
+ }
+ /* let's not call the timing for each row, but only each 100 rows */
+ else if (use_quiet && (j % 100 == 0))
+ {
+ INSTR_TIME_SET_CURRENT(diff);
+ INSTR_TIME_SUBTRACT(diff, start);
+
+ elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
+ remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+
+ /* have we reached the next interval (or end)? */
+ if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
+ {
+ fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s).\n",
+ j, (int64) naccounts * scale,
+ (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);
+
+ /* skip to the next interval */
+ log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
+ }
+ }
+
}
if (PQputline(con, "\\.\n"))
{
@@ -1478,6 +1848,16 @@ init(void)
}
executeStatement(con, "commit");
+ /* vacuum */
+ if (!is_no_vacuum)
+ {
+ fprintf(stderr, "vacuum...\n");
+ executeStatement(con, "vacuum analyze pgbench_branches");
+ executeStatement(con, "vacuum analyze pgbench_tellers");
+ executeStatement(con, "vacuum analyze pgbench_accounts");
+ executeStatement(con, "vacuum analyze pgbench_history");
+ }
+
/*
* create indexes
*/
@@ -1511,11 +1891,11 @@ init(void)
}
else
#endif
- for (i = 0; i < lengthof(DDLAFTERs); i++)
+ for (i = 0; i < lengthof(DDLINDEXes); i++)
{
char buffer[256];
- strncpy(buffer, DDLAFTERs[i], 256);
+ strlcpy(buffer, DDLINDEXes[i], sizeof(buffer));
if (index_tablespace != NULL)
{
@@ -1523,7 +1903,7 @@ init(void)
escape_tablespace = PQescapeIdentifier(con, index_tablespace,
strlen(index_tablespace));
- snprintf(buffer + strlen(buffer), 256 - strlen(buffer),
+ snprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer),
" using index tablespace %s", escape_tablespace);
PQfreemem(escape_tablespace);
}
@@ -1531,12 +1911,17 @@ init(void)
executeStatement(con, buffer);
}
- /* vacuum */
- fprintf(stderr, "vacuum...");
- executeStatement(con, "vacuum analyze pgbench_branches");
- executeStatement(con, "vacuum analyze pgbench_tellers");
- executeStatement(con, "vacuum analyze pgbench_accounts");
- executeStatement(con, "vacuum analyze pgbench_history");
+ /*
+ * create foreign keys
+ */
+ if (foreign_keys)
+ {
+ fprintf(stderr, "set foreign keys...\n");
+ for (i = 0; i < lengthof(DDLKEYs); i++)
+ {
+ executeStatement(con, DDLKEYs[i]);
+ }
+ }
fprintf(stderr, "done.\n");
PQfinish(con);
@@ -1551,7 +1936,7 @@ parseQuery(Command *cmd, const char *raw_sql)
char *sql,
*p;
- sql = xstrdup(raw_sql);
+ sql = pg_strdup(raw_sql);
cmd->argc = 1;
p = sql;
@@ -1613,8 +1998,8 @@ process_commands(char *buf)
return NULL;
/* Allocate and initialize Command structure */
- my_commands = (Command *) xmalloc(sizeof(Command));
- my_commands->line = xstrdup(buf);
+ my_commands = (Command *) pg_malloc(sizeof(Command));
+ my_commands->line = pg_strdup(buf);
my_commands->command_num = num_commands++;
my_commands->type = 0; /* until set */
my_commands->argc = 0;
@@ -1628,7 +2013,7 @@ process_commands(char *buf)
while (tok != NULL)
{
- my_commands->argv[j++] = xstrdup(tok);
+ my_commands->argv[j++] = pg_strdup(tok);
my_commands->argc++;
tok = strtok(NULL, delim);
}
@@ -1730,7 +2115,7 @@ process_commands(char *buf)
switch (querymode)
{
case QUERY_SIMPLE:
- my_commands->argv[0] = xstrdup(p);
+ my_commands->argv[0] = pg_strdup(p);
my_commands->argc++;
break;
case QUERY_EXTENDED:
@@ -1746,6 +2131,49 @@ process_commands(char *buf)
return my_commands;
}
+/*
+ * Read a line from fd, and return it in a malloc'd buffer.
+ * Return NULL at EOF.
+ *
+ * The buffer will typically be larger than necessary, but we don't care
+ * in this program, because we'll free it as soon as we've parsed the line.
+ */
+static char *
+read_line_from_file(FILE *fd)
+{
+ char tmpbuf[BUFSIZ];
+ char *buf;
+ size_t buflen = BUFSIZ;
+ size_t used = 0;
+
+ buf = (char *) palloc(buflen);
+ buf[0] = '\0';
+
+ while (fgets(tmpbuf, BUFSIZ, fd) != NULL)
+ {
+ size_t thislen = strlen(tmpbuf);
+
+ /* Append tmpbuf to whatever we had already */
+ memcpy(buf + used, tmpbuf, thislen + 1);
+ used += thislen;
+
+ /* Done if we collected a newline */
+ if (thislen > 0 && tmpbuf[thislen - 1] == '\n')
+ break;
+
+ /* Else, enlarge buf to ensure we can append next bufferload */
+ buflen += BUFSIZ;
+ buf = (char *) pg_realloc(buf, buflen);
+ }
+
+ if (used > 0)
+ return buf;
+
+ /* Reached EOF */
+ free(buf);
+ return NULL;
+}
+
static int
process_file(char *filename)
{
@@ -1754,7 +2182,7 @@ process_file(char *filename)
Command **my_commands;
FILE *fd;
int lineno;
- char buf[BUFSIZ];
+ char *buf;
int alloc_num;
if (num_files >= MAX_FILES)
@@ -1764,7 +2192,7 @@ process_file(char *filename)
}
alloc_num = COMMANDS_ALLOC_NUM;
- my_commands = (Command **) xmalloc(sizeof(Command *) * alloc_num);
+ my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
if (strcmp(filename, "-") == 0)
fd = stdin;
@@ -1776,11 +2204,14 @@ process_file(char *filename)
lineno = 0;
- while (fgets(buf, sizeof(buf), fd) != NULL)
+ while ((buf = read_line_from_file(fd)) != NULL)
{
Command *command;
command = process_commands(buf);
+
+ free(buf);
+
if (command == NULL)
continue;
@@ -1790,7 +2221,7 @@ process_file(char *filename)
if (lineno >= alloc_num)
{
alloc_num += COMMANDS_ALLOC_NUM;
- my_commands = xrealloc(my_commands, sizeof(Command *) * alloc_num);
+ my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
}
}
fclose(fd);
@@ -1813,7 +2244,7 @@ process_builtin(char *tb)
int alloc_num;
alloc_num = COMMANDS_ALLOC_NUM;
- my_commands = (Command **) xmalloc(sizeof(Command *) * alloc_num);
+ my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
lineno = 0;
@@ -1844,7 +2275,7 @@ process_builtin(char *tb)
if (lineno >= alloc_num)
{
alloc_num += COMMANDS_ALLOC_NUM;
- my_commands = xrealloc(my_commands, sizeof(Command *) * alloc_num);
+ my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num);
}
}
@@ -1855,9 +2286,11 @@ process_builtin(char *tb)
/* print out results */
static void
-printResults(int ttype, int normal_xacts, int nclients,
+printResults(int ttype, int64 normal_xacts, int nclients,
TState *threads, int nthreads,
- instr_time total_time, instr_time conn_total_time)
+ instr_time total_time, instr_time conn_total_time,
+ int64 total_latencies, int64 total_sqlats,
+ int64 throttle_lag, int64 throttle_lag_max)
{
double time_include,
tps_include,
@@ -1886,15 +2319,45 @@ printResults(int ttype, int normal_xacts, int nclients,
if (duration <= 0)
{
printf("number of transactions per client: %d\n", nxacts);
- printf("number of transactions actually processed: %d/%d\n",
- normal_xacts, nxacts * nclients);
+ printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
+ normal_xacts, (int64) nxacts * nclients);
}
else
{
printf("duration: %d s\n", duration);
- printf("number of transactions actually processed: %d\n",
+ printf("number of transactions actually processed: " INT64_FORMAT "\n",
normal_xacts);
}
+
+ if (throttle_delay || progress)
+ {
+ /* compute and show latency average and standard deviation */
+ double latency = 0.001 * total_latencies / normal_xacts;
+ double sqlat = (double) total_sqlats / normal_xacts;
+
+ printf("latency average: %.3f ms\n"
+ "latency stddev: %.3f ms\n",
+ latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
+ }
+ else
+ {
+ /* only an average latency computed from the duration is available */
+ printf("latency average: %.3f ms\n",
+ 1000.0 * duration * nclients / normal_xacts);
+ }
+
+ if (throttle_delay)
+ {
+ /*
+ * Report average transaction lag under rate limit throttling. This
+ * is the delay between scheduled and actual start times for the
+ * transaction. The measured lag may be caused by thread/client load,
+ * the database load, or the Poisson throttling process.
+ */
+ printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
+ 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+ }
+
printf("tps = %f (including connections establishing)\n", tps_include);
printf("tps = %f (excluding connections establishing)\n", tps_exclude);
@@ -1948,6 +2411,42 @@ printResults(int ttype, int normal_xacts, int nclients,
int
main(int argc, char **argv)
{
+ static struct option long_options[] = {
+ /* systematic long/short named options */
+ {"client", required_argument, NULL, 'c'},
+ {"connect", no_argument, NULL, 'C'},
+ {"debug", no_argument, NULL, 'd'},
+ {"define", required_argument, NULL, 'D'},
+ {"file", required_argument, NULL, 'f'},
+ {"fillfactor", required_argument, NULL, 'F'},
+ {"host", required_argument, NULL, 'h'},
+ {"initialize", no_argument, NULL, 'i'},
+ {"jobs", required_argument, NULL, 'j'},
+ {"log", no_argument, NULL, 'l'},
+ {"no-vacuum", no_argument, NULL, 'n'},
+ {"port", required_argument, NULL, 'p'},
+ {"progress", required_argument, NULL, 'P'},
+ {"protocol", required_argument, NULL, 'M'},
+ {"quiet", no_argument, NULL, 'q'},
+ {"report-latencies", no_argument, NULL, 'r'},
+ {"scale", required_argument, NULL, 's'},
+ {"select-only", no_argument, NULL, 'S'},
+ {"skip-some-updates", no_argument, NULL, 'N'},
+ {"time", required_argument, NULL, 'T'},
+ {"transactions", required_argument, NULL, 't'},
+ {"username", required_argument, NULL, 'U'},
+ {"vacuum-all", no_argument, NULL, 'v'},
+ /* long-named only options */
+ {"foreign-keys", no_argument, &foreign_keys, 1},
+ {"index-tablespace", required_argument, NULL, 3},
+ {"tablespace", required_argument, NULL, 2},
+ {"unlogged-tables", no_argument, &unlogged_tables, 1},
+ {"sampling-rate", required_argument, NULL, 4},
+ {"aggregate-interval", required_argument, NULL, 5},
+ {"rate", required_argument, NULL, 'R'},
+ {NULL, 0, NULL, 0}
+ };
+
int c;
int nclients = 1; /* default number of simulated clients */
int nthreads = 1; /* default number of threads */
@@ -1966,17 +2465,14 @@ main(int argc, char **argv)
instr_time start_time; /* start up time */
instr_time total_time;
instr_time conn_total_time;
- int total_xacts;
+ int64 total_xacts = 0;
+ int64 total_latencies = 0;
+ int64 total_sqlats = 0;
+ int64 throttle_lag = 0;
+ int64 throttle_lag_max = 0;
int i;
- static struct option long_options[] = {
- {"index-tablespace", required_argument, NULL, 3},
- {"tablespace", required_argument, NULL, 2},
- {"unlogged-tables", no_argument, &unlogged_tables, 1},
- {NULL, 0, NULL, 0}
- };
-
#ifdef HAVE_GETRLIMIT
struct rlimit rlim;
#endif
@@ -1987,15 +2483,13 @@ main(int argc, char **argv)
char val[64];
- const char *progname;
-
progname = get_progname(argv[0]);
if (argc > 1)
{
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
{
- usage(progname);
+ usage();
exit(0);
}
if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
@@ -2017,13 +2511,13 @@ main(int argc, char **argv)
else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
login = env;
- state = (CState *) xmalloc(sizeof(CState));
+ state = (CState *) pg_malloc(sizeof(CState));
memset(state, 0, sizeof(CState));
#ifdef PGXC
- while ((c = getopt_long(argc, argv, "ih:knvp:dSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "ih:knvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
#else
- while ((c = getopt_long(argc, argv, "ih:nvp:dSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
+ while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
#endif
{
switch (c)
@@ -2037,7 +2531,7 @@ main(int argc, char **argv)
break;
#endif
case 'h':
- pghost = optarg;
+ pghost = pg_strdup(optarg);
break;
case 'n':
is_no_vacuum++;
@@ -2046,7 +2540,7 @@ main(int argc, char **argv)
do_vacuum_accounts++;
break;
case 'p':
- pgport = optarg;
+ pgport = pg_strdup(optarg);
break;
case 'd':
debug++;
@@ -2132,14 +2626,17 @@ main(int argc, char **argv)
}
break;
case 'U':
- login = optarg;
+ login = pg_strdup(optarg);
break;
case 'l':
use_log = true;
break;
+ case 'q':
+ use_quiet = true;
+ break;
case 'f':
ttype = 3;
- filename = optarg;
+ filename = pg_strdup(optarg);
if (process_file(filename) == false || *sql_files[num_files - 1] == NULL)
exit(1);
break;
@@ -2181,14 +2678,59 @@ main(int argc, char **argv)
exit(1);
}
break;
+ case 'P':
+ progress = atoi(optarg);
+ if (progress <= 0)
+ {
+ fprintf(stderr,
+ "thread progress delay (-P) must be positive (%s)\n",
+ optarg);
+ exit(1);
+ }
+ break;
+ case 'R':
+ {
+ /* get a double from the beginning of option value */
+ double throttle_value = atof(optarg);
+
+ if (throttle_value <= 0.0)
+ {
+ fprintf(stderr, "invalid rate limit: %s\n", optarg);
+ exit(1);
+ }
+ /* Invert rate limit into a time offset */
+ throttle_delay = (int64) (1000000.0 / throttle_value);
+ }
+ break;
case 0:
/* This covers long options which take no argument. */
break;
case 2: /* tablespace */
- tablespace = optarg;
+ tablespace = pg_strdup(optarg);
break;
case 3: /* index-tablespace */
- index_tablespace = optarg;
+ index_tablespace = pg_strdup(optarg);
+ break;
+ case 4:
+ sample_rate = atof(optarg);
+ if (sample_rate <= 0.0 || sample_rate > 1.0)
+ {
+ fprintf(stderr, "invalid sampling rate: %f\n", sample_rate);
+ exit(1);
+ }
+ break;
+ case 5:
+#ifdef WIN32
+ fprintf(stderr, "--aggregate-interval is not currently supported on Windows");
+ exit(1);
+#else
+ agg_interval = atoi(optarg);
+ if (agg_interval <= 0)
+ {
+ fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval);
+ exit(1);
+ }
+#endif
break;
default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
@@ -2197,6 +2739,9 @@ main(int argc, char **argv)
}
}
+ /* compute a per thread delay */
+ throttle_delay *= nthreads;
+
if (argc > optind)
dbName = argv[optind];
else
@@ -2211,7 +2756,7 @@ main(int argc, char **argv)
if (is_init_mode)
{
- init();
+ init(is_no_vacuum);
exit(0);
}
@@ -2225,6 +2770,45 @@ main(int argc, char **argv)
exit(1);
}
+ /* --sampling-rate may be used only with -l */
+ if (sample_rate > 0.0 && !use_log)
+ {
+ fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n");
+ exit(1);
+ }
+
+ /* -q may be used only with -i */
+ if (use_quiet && !is_init_mode)
+ {
+ fprintf(stderr, "quiet-logging is allowed only in initialization mode (-i)\n");
+ exit(1);
+ }
+
+ /* --sampling-rate may must not be used with --aggregate-interval */
+ if (sample_rate > 0.0 && agg_interval > 0)
+ {
+ fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n");
+ exit(1);
+ }
+
+ if (agg_interval > 0 && (!use_log))
+ {
+ fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n");
+ exit(1);
+ }
+
+ if ((duration > 0) && (agg_interval > duration))
+ {
+ fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration);
+ exit(1);
+ }
+
+ if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0))
+ {
+ fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval);
+ exit(1);
+ }
+
/*
* is_latencies only works with multiple threads in thread-based
* implementations, not fork-based ones, because it supposes that the
@@ -2246,10 +2830,12 @@ main(int argc, char **argv)
* changed after fork.
*/
main_pid = (int) getpid();
+ progress_nclients = nclients;
+ progress_nthreads = nthreads;
if (nclients > 1)
{
- state = (CState *) xrealloc(state, sizeof(CState) * nclients);
+ state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
memset(state + 1, 0, sizeof(CState) * (nclients - 1));
/* copy any -D switch values to all clients */
@@ -2329,6 +2915,20 @@ main(int argc, char **argv)
}
}
+ /*
+ * Define a :client_id variable that is unique per connection. But don't
+ * override an explicit -D switch.
+ */
+ if (getVariable(&state[0], "client_id") == NULL)
+ {
+ for (i = 0; i < nclients; i++)
+ {
+ snprintf(val, sizeof(val), "%d", i);
+ if (!putVariable(&state[i], "startup", "client_id", val))
+ exit(1);
+ }
+ }
+
if (!is_no_vacuum)
{
fprintf(stderr, "starting vacuum...");
@@ -2383,7 +2983,7 @@ main(int argc, char **argv)
}
/* set up thread data structures */
- threads = (TState *) xmalloc(sizeof(TState) * nthreads);
+ threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
@@ -2401,9 +3001,9 @@ main(int argc, char **argv)
int t;
thread->exec_elapsed = (instr_time *)
- xmalloc(sizeof(instr_time) * num_commands);
+ pg_malloc(sizeof(instr_time) * num_commands);
thread->exec_count = (int *)
- xmalloc(sizeof(int) * num_commands);
+ pg_malloc(sizeof(int) * num_commands);
for (t = 0; t < num_commands; t++)
{
@@ -2450,7 +3050,6 @@ main(int argc, char **argv)
}
/* wait for threads and accumulate results */
- total_xacts = 0;
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
@@ -2466,17 +3065,32 @@ main(int argc, char **argv)
TResult *r = (TResult *) ret;
total_xacts += r->xacts;
+ total_latencies += r->latencies;
+ total_sqlats += r->sqlats;
+ throttle_lag += r->throttle_lag;
+ if (r->throttle_lag_max > throttle_lag_max)
+ throttle_lag_max = r->throttle_lag_max;
INSTR_TIME_ADD(conn_total_time, r->conn_time);
free(ret);
}
}
disconnect_all(state, nclients);
- /* get end time */
+ /*
+ * XXX We compute results as though every client of every thread started
+ * and finished at the same time. That model can diverge noticeably from
+ * reality for a short benchmark run involving relatively many threads.
+ * The first thread may process notably many transactions before the last
+ * thread begins. Improving the model alone would bring limited benefit,
+ * because performance during those periods of partial thread count can
+ * easily exceed steady state performance. This is one of the many ways
+ * short runs convey deceptive performance figures.
+ */
INSTR_TIME_SET_CURRENT(total_time);
INSTR_TIME_SUBTRACT(total_time, start_time);
printResults(ttype, total_xacts, nclients, threads, nthreads,
- total_time, conn_total_time);
+ total_time, conn_total_time, total_latencies, total_sqlats,
+ throttle_lag, throttle_lag_max);
return 0;
}
@@ -2494,7 +3108,30 @@ threadRun(void *arg)
int remains = nstate; /* number of remaining clients */
int i;
- result = xmalloc(sizeof(TResult));
+ /* for reporting progress: */
+ int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
+ int64 last_report = thread_start;
+ int64 next_report = last_report + (int64) progress * 1000000;
+ int64 last_count = 0,
+ last_lats = 0,
+ last_sqlats = 0,
+ last_lags = 0;
+
+ AggVals aggs;
+
+ /*
+ * Initialize throttling rate target for all of the thread's clients. It
+ * might be a little more accurate to reset thread->start_time here too.
+ * The possible drift seems too small relative to typical throttle delay
+ * times to worry about it.
+ */
+ INSTR_TIME_SET_CURRENT(start);
+ thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+ thread->throttle_lag = 0;
+ thread->throttle_lag_max = 0;
+
+ result = pg_malloc(sizeof(TResult));
+
INSTR_TIME_SET_ZERO(result->conn_time);
/* open log file if requested */
@@ -2529,6 +3166,8 @@ threadRun(void *arg)
INSTR_TIME_SET_CURRENT(result->conn_time);
INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+ agg_vals_init(&aggs, thread->start_time);
+
/* send start up queries in async manner */
for (i = 0; i < nstate; i++)
{
@@ -2537,7 +3176,7 @@ threadRun(void *arg)
int prev_ecnt = st->ecnt;
st->use_file = getrand(thread, 0, num_files - 1);
- if (!doCustom(thread, st, &result->conn_time, logfile))
+ if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -2566,25 +3205,38 @@ threadRun(void *arg)
Command **commands = sql_files[st->use_file];
int sock;
- if (st->sleeping)
+ if (st->con == NULL)
{
- int this_usec;
-
- if (min_usec == INT64_MAX)
+ continue;
+ }
+ else if (st->sleeping)
+ {
+ if (st->throttling && timer_exceeded)
{
- instr_time now;
-
- INSTR_TIME_SET_CURRENT(now);
- now_usec = INSTR_TIME_GET_MICROSEC(now);
+ /* interrupt client which has not started a transaction */
+ remains--;
+ st->sleeping = 0;
+ st->throttling = false;
+ PQfinish(st->con);
+ st->con = NULL;
+ continue;
}
+ else /* just a nap from the script */
+ {
+ int this_usec;
- this_usec = st->until - now_usec;
- if (min_usec > this_usec)
- min_usec = this_usec;
- }
- else if (st->con == NULL)
- {
- continue;
+ if (min_usec == INT64_MAX)
+ {
+ instr_time now;
+
+ INSTR_TIME_SET_CURRENT(now);
+ now_usec = INSTR_TIME_GET_MICROSEC(now);
+ }
+
+ this_usec = st->until - now_usec;
+ if (min_usec > this_usec)
+ min_usec = this_usec;
+ }
}
else if (commands[st->state]->type == META_COMMAND)
{
@@ -2639,7 +3291,7 @@ threadRun(void *arg)
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[st->state]->type == META_COMMAND))
{
- if (!doCustom(thread, st, &result->conn_time, logfile))
+ if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
remains--; /* I've aborted */
}
@@ -2651,14 +3303,141 @@ threadRun(void *arg)
st->con = NULL;
}
}
+
+#ifdef PTHREAD_FORK_EMULATION
+ /* each process reports its own progression */
+ if (progress)
+ {
+ instr_time now_time;
+ int64 now;
+
+ INSTR_TIME_SET_CURRENT(now_time);
+ now = INSTR_TIME_GET_MICROSEC(now_time);
+ if (now >= next_report)
+ {
+ /* generate and show report */
+ int64 count = 0,
+ lats = 0,
+ sqlats = 0;
+ int64 lags = thread->throttle_lag;
+ int64 run = now - last_report;
+ double tps,
+ total_run,
+ latency,
+ sqlat,
+ stdev,
+ lag;
+
+ for (i = 0; i < nstate; i++)
+ {
+ count += state[i].cnt;
+ lats += state[i].txn_latencies;
+ sqlats += state[i].txn_sqlats;
+ }
+
+ total_run = (now - thread_start) / 1000000.0;
+ tps = 1000000.0 * (count - last_count) / run;
+ latency = 0.001 * (lats - last_lats) / (count - last_count);
+ sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
+ stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
+ lag = 0.001 * (lags - last_lags) / (count - last_count);
+
+ if (throttle_delay)
+ fprintf(stderr,
+ "progress %d: %.1f s, %.1f tps, "
+ "lat %.3f ms stddev %.3f, lag %.3f ms\n",
+ thread->tid, total_run, tps, latency, stdev, lag);
+ else
+ fprintf(stderr,
+ "progress %d: %.1f s, %.1f tps, "
+ "lat %.3f ms stddev %.3f\n",
+ thread->tid, total_run, tps, latency, stdev);
+
+ last_count = count;
+ last_lats = lats;
+ last_sqlats = sqlats;
+ last_lags = lags;
+ last_report = now;
+ next_report += (int64) progress *1000000;
+ }
+ }
+#else
+ /* progress report by thread 0 for all threads */
+ if (progress && thread->tid == 0)
+ {
+ instr_time now_time;
+ int64 now;
+
+ INSTR_TIME_SET_CURRENT(now_time);
+ now = INSTR_TIME_GET_MICROSEC(now_time);
+ if (now >= next_report)
+ {
+ /* generate and show report */
+ int64 count = 0,
+ lats = 0,
+ sqlats = 0,
+ lags = 0;
+ int64 run = now - last_report;
+ double tps,
+ total_run,
+ latency,
+ sqlat,
+ lag,
+ stdev;
+
+ for (i = 0; i < progress_nclients; i++)
+ {
+ count += state[i].cnt;
+ lats += state[i].txn_latencies;
+ sqlats += state[i].txn_sqlats;
+ }
+
+ for (i = 0; i < progress_nthreads; i++)
+ lags += thread[i].throttle_lag;
+
+ total_run = (now - thread_start) / 1000000.0;
+ tps = 1000000.0 * (count - last_count) / run;
+ latency = 0.001 * (lats - last_lats) / (count - last_count);
+ sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
+ stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
+ lag = 0.001 * (lags - last_lags) / (count - last_count);
+
+ if (throttle_delay)
+ fprintf(stderr,
+ "progress: %.1f s, %.1f tps, "
+ "lat %.3f ms stddev %.3f, lag %.3f ms\n",
+ total_run, tps, latency, stdev, lag);
+ else
+ fprintf(stderr,
+ "progress: %.1f s, %.1f tps, "
+ "lat %.3f ms stddev %.3f\n",
+ total_run, tps, latency, stdev);
+
+ last_count = count;
+ last_lats = lats;
+ last_sqlats = sqlats;
+ last_lags = lags;
+ last_report = now;
+ next_report += (int64) progress *1000000;
+ }
+ }
+#endif /* PTHREAD_FORK_EMULATION */
}
done:
INSTR_TIME_SET_CURRENT(start);
disconnect_all(state, nstate);
result->xacts = 0;
+ result->latencies = 0;
+ result->sqlats = 0;
for (i = 0; i < nstate; i++)
+ {
result->xacts += state[i].cnt;
+ result->latencies += state[i].txn_latencies;
+ result->sqlats += state[i].txn_sqlats;
+ }
+ result->throttle_lag = thread->throttle_lag;
+ result->throttle_lag_max = thread->throttle_lag_max;
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
if (logfile)
@@ -2666,7 +3445,6 @@ done:
return result;
}
-
/*
* Support for duration option: set timer_exceeded after so many seconds.
*/
@@ -2706,8 +3484,9 @@ pthread_create(pthread_t *thread,
{
fork_pthread *th;
void *ret;
+ int rc;
- th = (fork_pthread *) xmalloc(sizeof(fork_pthread));
+ th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
if (pipe(th->pipes) < 0)
{
free(th);
@@ -2735,7 +3514,8 @@ pthread_create(pthread_t *thread,
setalarm(duration);
ret = start_routine(arg);
- write(th->pipes[1], ret, sizeof(TResult));
+ rc = write(th->pipes[1], ret, sizeof(TResult));
+ (void) rc;
close(th->pipes[1]);
free(th);
exit(0);
@@ -2755,7 +3535,7 @@ pthread_join(pthread_t th, void **thread_return)
if (thread_return != NULL)
{
/* assume result is TResult */
- *thread_return = xmalloc(sizeof(TResult));
+ *thread_return = pg_malloc(sizeof(TResult));
if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
{
free(*thread_return);
@@ -2823,7 +3603,7 @@ pthread_create(pthread_t *thread,
int save_errno;
win32_pthread *th;
- th = (win32_pthread *) xmalloc(sizeof(win32_pthread));
+ th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
th->routine = start_routine;
th->arg = arg;
th->result = NULL;