summaryrefslogtreecommitdiff
path: root/src/bin/pgbench
diff options
context:
space:
mode:
authorPavan Deolasee2017-06-14 05:42:18 +0000
committerPavan Deolasee2017-06-14 05:42:18 +0000
commit15dd5274c323fb93e4e3ea9ad2185aaaec10f79c (patch)
tree9dafb4c7f735d9429ea461dc792933af87493c33 /src/bin/pgbench
parentdfbb88e3bbb526dcb204b456b9e5cfd9d10d0d0a (diff)
parentd5cb3bab564e0927ffac7c8729eacf181a12dd40 (diff)
Merge from PG master upto d5cb3bab564e0927ffac7c8729eacf181a12dd40
This is the result of the "git merge remotes/PGSQL/master" upto the said commit point. We have done some basic analysis, fixed compilation problems etc, but bulk of the logical problems in conflict resolution etc will be handled by subsequent commits.
Diffstat (limited to 'src/bin/pgbench')
-rw-r--r--src/bin/pgbench/exprparse.y2
-rw-r--r--src/bin/pgbench/exprscan.l23
-rw-r--r--src/bin/pgbench/pgbench.c1279
-rw-r--r--src/bin/pgbench/pgbench.h2
4 files changed, 792 insertions, 514 deletions
diff --git a/src/bin/pgbench/exprparse.y b/src/bin/pgbench/exprparse.y
index 0cc665b75b..b3a2d9bfd3 100644
--- a/src/bin/pgbench/exprparse.y
+++ b/src/bin/pgbench/exprparse.y
@@ -4,7 +4,7 @@
* exprparse.y
* bison grammar for a simple expression syntax
*
- * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/bin/pgbench/exprparse.y
diff --git a/src/bin/pgbench/exprscan.l b/src/bin/pgbench/exprscan.l
index 20891a3b22..dc1367bbdb 100644
--- a/src/bin/pgbench/exprscan.l
+++ b/src/bin/pgbench/exprscan.l
@@ -15,7 +15,7 @@
*
* Note that this lexer operates within the framework created by psqlscan.l,
*
- * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/bin/pgbench/exprscan.l
@@ -66,6 +66,9 @@ space [ \t\r\f\v]
nonspace [^ \t\r\f\v\n]
newline [\n]
+/* Line continuation marker */
+continuation \\{newline}
+
/* Exclusive states */
%x EXPR
@@ -96,8 +99,20 @@ newline [\n]
return 1;
}
+ /*
+ * We need this rule to avoid returning "word\" instead of recognizing
+ * a continuation marker just after a word:
+ */
+{nonspace}+{continuation} {
+ /* Found "word\\\n", emit and return just "word" */
+ psqlscan_emit(cur_state, yytext, yyleng - 2);
+ return 1;
+ }
+
{space}+ { /* ignore */ }
+{continuation} { /* ignore */ }
+
{newline} {
/* report end of command */
last_was_newline = true;
@@ -138,14 +153,16 @@ newline [\n]
return FUNCTION;
}
+{space}+ { /* ignore */ }
+
+{continuation} { /* ignore */ }
+
{newline} {
/* report end of command */
last_was_newline = true;
return 0;
}
-{space}+ { /* ignore */ }
-
. {
/*
* must strdup yytext so that expr_yyerror_more doesn't
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 364b91ce31..b3b89b43d2 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -5,7 +5,7 @@
* Originally written by Tatsuo Ishii and enhanced by many contributors.
*
* src/bin/pgbench/pgbench.c
- * Copyright (c) 2000-2016, PostgreSQL Global Development Group
+ * Copyright (c) 2000-2017, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
@@ -42,6 +42,7 @@
#include <limits.h>
#include <math.h>
#include <signal.h>
+#include <time.h>
#include <sys/time.h>
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
@@ -183,6 +184,7 @@ char *pghost = "";
char *pgport = "";
char *login = NULL;
char *dbName;
+char *logfile_prefix = NULL;
const char *progname;
#define WSEP '@' /* weight separator */
@@ -229,7 +231,7 @@ typedef struct SimpleStats
*/
typedef struct StatsData
{
- long start_time; /* interval start time, for aggregates */
+ time_t start_time; /* interval start time, for aggregates */
int64 cnt; /* number of transactions */
int64 skipped; /* number of transactions skipped under --rate
* and --latency-limit */
@@ -238,24 +240,95 @@ typedef struct StatsData
} StatsData;
/*
- * Connection state
+ * Connection state machine states.
+ */
+typedef enum
+{
+ /*
+ * The client must first choose a script to execute. Once chosen, it can
+ * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
+ * right away (state CSTATE_START_TX).
+ */
+ CSTATE_CHOOSE_SCRIPT,
+
+ /*
+ * In CSTATE_START_THROTTLE state, we calculate when to begin the next
+ * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
+ * sleeps until that moment. (If throttling is not enabled, doCustom()
+ * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
+ */
+ CSTATE_START_THROTTLE,
+ CSTATE_THROTTLE,
+
+ /*
+ * CSTATE_START_TX performs start-of-transaction processing. Establishes
+ * a new connection for the transaction, in --connect mode, and records
+ * the transaction start time.
+ */
+ CSTATE_START_TX,
+
+ /*
+ * We loop through these states, to process each command in the script:
+ *
+ * CSTATE_START_COMMAND starts the execution of a command. On a SQL
+ * command, the command is sent to the server, and we move to
+ * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
+ * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
+ * meta-commands are executed immediately.
+ *
+ * CSTATE_WAIT_RESULT waits until we get a result set back from the server
+ * for the current command.
+ *
+ * CSTATE_SLEEP waits until the end of \sleep.
+ *
+ * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
+ * command counter, and loops back to CSTATE_START_COMMAND state.
+ */
+ CSTATE_START_COMMAND,
+ CSTATE_WAIT_RESULT,
+ CSTATE_SLEEP,
+ CSTATE_END_COMMAND,
+
+ /*
+ * CSTATE_END_TX performs end-of-transaction processing. Calculates
+ * latency, and logs the transaction. In --connect mode, closes the
+ * current connection. Chooses the next script to execute and starts over
+ * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have no
+ * more work to do.
+ */
+ CSTATE_END_TX,
+
+ /*
+ * Final states. CSTATE_ABORTED means that the script execution was
+ * aborted because a command failed, CSTATE_FINISHED means success.
+ */
+ CSTATE_ABORTED,
+ CSTATE_FINISHED
+} ConnectionStateEnum;
+
+/*
+ * Connection state.
*/
typedef struct
{
PGconn *con; /* connection handle to DB */
int id; /* client No. */
- int state; /* state No. */
- bool listen; /* whether an async query has been sent */
- bool sleeping; /* whether the client is napping */
- bool throttling; /* whether nap is for throttling */
- bool is_throttled; /* whether transaction throttling is done */
+ ConnectionStateEnum state; /* state machine's current state. */
+
+ int use_file; /* index in sql_script for this client */
+ int command; /* command number in script */
+
+ /* client variables */
Variable *variables; /* array of variable definitions */
int nvariables; /* number of variables */
bool vars_sorted; /* are variables sorted by name? */
+
+ /* various times about current transaction */
int64 txn_scheduled; /* scheduled start time of transaction (usec) */
+ int64 sleep_until; /* scheduled start time of next cmd (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */
- int use_file; /* index in sql_scripts for this client */
+
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
/* per client collected stats */
@@ -382,7 +455,7 @@ static const BuiltinScript builtin_script[] =
static void setIntValue(PgBenchValue *pv, int64 ival);
static void setDoubleValue(PgBenchValue *pv, double dval);
static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *);
-static void doLog(TState *thread, CState *st, instr_time *now,
+static void doLog(TState *thread, CState *st,
StatsData *agg, bool skipped, double latency, double lag);
static void processXactStats(TState *thread, CState *st, instr_time *now,
bool skipped, StatsData *agg);
@@ -451,6 +524,8 @@ usage(void)
" --aggregate-interval=NUM aggregate data over NUM seconds\n"
" --progress-timestamp use Unix epoch timestamps for progress\n"
" --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
+ " --log-prefix=PREFIX prefix for transaction time log file\n"
+ " (default: \"pgbench_log\")\n"
"\nCommon options:\n"
" -d, --debug print debugging output\n"
" -h, --host=HOSTNAME database server host or socket directory\n"
@@ -717,7 +792,7 @@ mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
* the given value.
*/
static void
-initStats(StatsData *sd, double start_time)
+initStats(StatsData *sd, time_t start_time)
{
sd->start_time = start_time;
sd->cnt = 0;
@@ -784,8 +859,9 @@ static PGconn *
doConnect(void)
{
PGconn *conn;
- static char *password = NULL;
bool new_pass;
+ static bool have_password = false;
+ static char password[100];
/*
* Start the connection. Loop until we have a password if requested by
@@ -805,7 +881,7 @@ doConnect(void)
keywords[2] = "user";
values[2] = login;
keywords[3] = "password";
- values[3] = password;
+ values[3] = have_password ? password : NULL;
keywords[4] = "dbname";
values[4] = dbName;
keywords[5] = "fallback_application_name";
@@ -826,10 +902,11 @@ doConnect(void)
if (PQstatus(conn) == CONNECTION_BAD &&
PQconnectionNeedsPassword(conn) &&
- password == NULL)
+ !have_password)
{
PQfinish(conn);
- password = simple_prompt("Password: ", 100, false);
+ simple_prompt("Password: ", password, sizeof(password), false);
+ have_password = true;
new_pass = true;
}
} while (new_pass);
@@ -1390,7 +1467,7 @@ evalFunc(TState *thread, CState *st,
Assert(nargs == 1);
fprintf(stderr, "debug(script=%d,command=%d): ",
- st->use_file, st->state + 1);
+ st->use_file, st->command + 1);
if (varg->type == PGBT_INT)
fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
@@ -1741,15 +1818,12 @@ preparedStatementName(char *buffer, int file, int state)
sprintf(buffer, "P%d_%d", file, state);
}
-static bool
-clientDone(CState *st)
+static void
+commandFailed(CState *st, char *message)
{
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
- return false; /* always false */
+ fprintf(stderr,
+ "client %d aborted in command %d of script %d; %s\n",
+ st->id, st->command, st->use_file, message);
}
/* return a script number with a weighted choice. */
@@ -1771,430 +1845,607 @@ chooseScript(TState *thread)
return i - 1;
}
-/* return false iff client should be disconnected */
+/* Send a SQL command, using the chosen querymode */
static bool
-doCustom(TState *thread, CState *st, StatsData *agg)
+sendCommand(CState *st, Command *command)
{
- PGresult *res;
- Command **commands;
- bool trans_needs_throttle = false;
- instr_time now;
+ int r;
- /*
- * gettimeofday() isn't free, so we get the current timestamp lazily the
- * first time it's needed, and reuse the same value throughout this
- * function after that. This also ensures that e.g. the calculated latency
- * reported in the log file and in the totals are the same. Zero means
- * "not set yet". Reset "now" when we step to the next command with "goto
- * top", though.
- */
-top:
- INSTR_TIME_SET_ZERO(now);
+ if (querymode == QUERY_SIMPLE)
+ {
+ char *sql;
- commands = sql_script[st->use_file].commands;
+ sql = pg_strdup(command->argv[0]);
+ sql = assignVariables(st, sql);
- /*
- * Handle throttling once per transaction by sleeping. It is simpler to
- * do this here rather than at the end, because so much complicated logic
- * happens below when statements finish.
- */
- if (throttle_delay && !st->is_throttled)
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQuery(st->con, sql);
+ free(sql);
+ }
+ else if (querymode == QUERY_EXTENDED)
{
- /*
- * Generate a delay such that the series of delays will approximate a
- * Poisson distribution centered on the throttle_delay time.
- *
- * If transactions are too slow or a given wait is shorter than a
- * transaction, the next transaction will start right away.
- */
- int64 wait = getPoissonRand(thread, throttle_delay);
+ const char *sql = command->argv[0];
+ const char *params[MAX_ARGS];
- thread->throttle_trigger += wait;
- st->txn_scheduled = thread->throttle_trigger;
+ getQueryParams(st, command, params);
- /* stop client if next transaction is beyond pgbench end of execution */
- if (duration > 0 && st->txn_scheduled > end_time)
- return clientDone(st);
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQueryParams(st->con, sql, command->argc - 1,
+ NULL, params, NULL, NULL, 0);
+ }
+ else if (querymode == QUERY_PREPARED)
+ {
+ char name[MAX_PREPARE_NAME];
+ const char *params[MAX_ARGS];
- /*
- * If this --latency-limit is used, and this slot is already late so
- * that the transaction will miss the latency limit even if it
- * completed immediately, we skip this time slot and iterate till the
- * next slot that isn't late yet.
- */
- if (latency_limit)
+ if (!st->prepared[st->use_file])
{
- int64 now_us;
+ int j;
+ Command **commands = sql_script[st->use_file].commands;
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- now_us = INSTR_TIME_GET_MICROSEC(now);
- while (thread->throttle_trigger < now_us - latency_limit)
+ for (j = 0; commands[j] != NULL; j++)
{
- processXactStats(thread, st, &now, true, agg);
- /* next rendez-vous */
- wait = getPoissonRand(thread, throttle_delay);
- thread->throttle_trigger += wait;
- st->txn_scheduled = thread->throttle_trigger;
+ PGresult *res;
+ char name[MAX_PREPARE_NAME];
+
+ if (commands[j]->type != SQL_COMMAND)
+ continue;
+ preparedStatementName(name, st->use_file, j);
+ res = PQprepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ fprintf(stderr, "%s", PQerrorMessage(st->con));
+ PQclear(res);
}
+ st->prepared[st->use_file] = true;
}
- st->sleeping = true;
- st->throttling = true;
- st->is_throttled = true;
+ getQueryParams(st, command, params);
+ preparedStatementName(name, st->use_file, st->command);
+
if (debug)
- fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
- st->id, wait);
+ fprintf(stderr, "client %d sending %s\n", st->id, name);
+ r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+ params, NULL, NULL, 0);
}
+ else /* unknown sql mode */
+ r = 0;
- if (st->sleeping)
- { /* are we sleeping? */
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
- return true; /* Still sleeping, nothing to do here */
- /* Else done sleeping, go ahead with next command */
- st->sleeping = false;
- st->throttling = false;
+ if (r == 0)
+ {
+ if (debug)
+ fprintf(stderr, "client %d could not send %s\n",
+ st->id, command->argv[0]);
+ st->ecnt++;
+ return false;
}
+ else
+ return true;
+}
- if (st->listen)
- { /* are we receiver? */
- if (commands[st->state]->type == SQL_COMMAND)
+/*
+ * Parse the argument to a \sleep command, and return the requested amount
+ * of delay, in microseconds. Returns true on success, false on error.
+ */
+static bool
+evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+{
+ char *var;
+ int usec;
+
+ if (*argv[1] == ':')
+ {
+ if ((var = getVariable(st, argv[1] + 1)) == NULL)
{
- if (debug)
- fprintf(stderr, "client %d receiving\n", st->id);
- if (!PQconsumeInput(st->con))
- { /* there's something wrong */
- fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
- return clientDone(st);
- }
- if (PQisBusy(st->con))
- return true; /* don't have the whole result yet */
+ fprintf(stderr, "%s: undefined variable \"%s\"\n",
+ argv[0], argv[1]);
+ return false;
}
+ usec = atoi(var);
+ }
+ else
+ usec = atoi(argv[1]);
- /*
- * command finished: accumulate per-command execution times in
- * thread-local data structure, if per-command latencies are requested
- */
- if (is_latencies)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
+ if (argc > 2)
+ {
+ if (pg_strcasecmp(argv[2], "ms") == 0)
+ usec *= 1000;
+ else if (pg_strcasecmp(argv[2], "s") == 0)
+ usec *= 1000000;
+ }
+ else
+ usec *= 1000000;
- /* XXX could use a mutex here, but we choose not to */
- addToSimpleStats(&commands[st->state]->stats,
- INSTR_TIME_GET_DOUBLE(now) -
- INSTR_TIME_GET_DOUBLE(st->stmt_begin));
- }
+ *usecs = usec;
+ return true;
+}
- /* transaction finished: calculate latency and log the transaction */
- if (commands[st->state + 1] == NULL)
- {
- if (progress || throttle_delay || latency_limit ||
- per_script_stats || use_log)
- processXactStats(thread, st, &now, false, agg);
- else
- thread->stats.cnt++;
- }
+/*
+ * Advance the state machine of a connection, if possible.
+ */
+static void
+doCustom(TState *thread, CState *st, StatsData *agg)
+{
+ PGresult *res;
+ Command *command;
+ instr_time now;
+ bool end_tx_processed = false;
+ int64 wait;
- if (commands[st->state]->type == SQL_COMMAND)
- {
- /*
- * Read and discard the query result; note this is not included in
- * the statement latency numbers.
- */
- res = PQgetResult(st->con);
- switch (PQresultStatus(res))
- {
- case PGRES_COMMAND_OK:
- case PGRES_TUPLES_OK:
- break; /* OK */
- default:
- fprintf(stderr, "client %d aborted in state %d: %s",
- st->id, st->state, PQerrorMessage(st->con));
- PQclear(res);
- return clientDone(st);
- }
- PQclear(res);
- discard_response(st);
- }
+ /*
+ * gettimeofday() isn't free, so we get the current timestamp lazily the
+ * first time it's needed, and reuse the same value throughout this
+ * function after that. This also ensures that e.g. the calculated
+ * latency reported in the log file and in the totals are the same. Zero
+ * means "not set yet". Reset "now" when we execute shell commands or
+ * expressions, which might take a non-negligible amount of time, though.
+ */
+ INSTR_TIME_SET_ZERO(now);
- if (commands[st->state + 1] == NULL)
+ /*
+ * Loop in the state machine, until we have to wait for a result from the
+ * server (or have to sleep, for throttling or for \sleep).
+ *
+ * Note: In the switch-statement below, 'break' will loop back here,
+ * meaning "continue in the state machine". Return is used to return to
+ * the caller.
+ */
+ for (;;)
+ {
+ switch (st->state)
{
- if (is_connect)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
+ /*
+ * Select transaction to run.
+ */
+ case CSTATE_CHOOSE_SCRIPT:
- ++st->cnt;
- if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
- return clientDone(st); /* exit success */
- }
+ st->use_file = chooseScript(thread);
- /* increment state counter */
- st->state++;
- if (commands[st->state] == NULL)
- {
- st->state = 0;
- st->use_file = chooseScript(thread);
- commands = sql_script[st->use_file].commands;
- if (debug)
- fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
- sql_script[st->use_file].desc);
- st->is_throttled = false;
-
- /*
- * No transaction is underway anymore, which means there is
- * nothing to listen to right now. When throttling rate limits
- * are active, a sleep will happen next, as the next transaction
- * starts. And then in any case the next SQL command will set
- * listen back to true.
- */
- st->listen = false;
- trans_needs_throttle = (throttle_delay > 0);
- }
- }
+ if (debug)
+ fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
+ sql_script[st->use_file].desc);
- if (st->con == NULL)
- {
- instr_time start,
- end;
+ if (throttle_delay > 0)
+ st->state = CSTATE_START_THROTTLE;
+ else
+ st->state = CSTATE_START_TX;
+ break;
- INSTR_TIME_SET_CURRENT(start);
- if ((st->con = doConnect()) == NULL)
- {
- fprintf(stderr, "client %d aborted while establishing connection\n",
- st->id);
- return clientDone(st);
- }
- INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
-
- /* Reset session-local state */
- st->listen = false;
- st->sleeping = false;
- st->throttling = false;
- st->is_throttled = false;
- memset(st->prepared, 0, sizeof(st->prepared));
- }
+ /*
+ * Handle throttling once per transaction by sleeping.
+ */
+ case CSTATE_START_THROTTLE:
- /*
- * This ensures that a throttling delay is inserted before proceeding with
- * sql commands, after the first transaction. The first transaction
- * throttling is performed when first entering doCustom.
- */
- if (trans_needs_throttle)
- {
- trans_needs_throttle = false;
- goto top;
- }
+ /*
+ * Generate a delay such that the series of delays will
+ * approximate a Poisson distribution centered on the
+ * throttle_delay time.
+ *
+ * If transactions are too slow or a given wait is shorter
+ * than a transaction, the next transaction will start right
+ * away.
+ */
+ Assert(throttle_delay > 0);
+ wait = getPoissonRand(thread, throttle_delay);
- /* Record transaction start time under logging, progress or throttling */
- if ((use_log || progress || throttle_delay || latency_limit ||
- per_script_stats) && st->state == 0)
- {
- INSTR_TIME_SET_CURRENT(st->txn_begin);
+ thread->throttle_trigger += wait;
+ st->txn_scheduled = thread->throttle_trigger;
- /*
- * When not throttling, this is also the transaction's scheduled start
- * time.
- */
- if (!throttle_delay)
- st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
- }
+ /*
+ * stop client if next transaction is beyond pgbench end of
+ * execution
+ */
+ if (duration > 0 && st->txn_scheduled > end_time)
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
- /* Record statement start time if per-command latencies are requested */
- if (is_latencies)
- INSTR_TIME_SET_CURRENT(st->stmt_begin);
+ /*
+ * If this --latency-limit is used, and this slot is already
+ * late so that the transaction will miss the latency limit
+ * even if it completed immediately, we skip this time slot
+ * and iterate till the next slot that isn't late yet.
+ */
+ if (latency_limit)
+ {
+ int64 now_us;
- if (commands[st->state]->type == SQL_COMMAND)
- {
- const Command *command = commands[st->state];
- int r;
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ now_us = INSTR_TIME_GET_MICROSEC(now);
+ while (thread->throttle_trigger < now_us - latency_limit)
+ {
+ processXactStats(thread, st, &now, true, agg);
+ /* next rendez-vous */
+ wait = getPoissonRand(thread, throttle_delay);
+ thread->throttle_trigger += wait;
+ st->txn_scheduled = thread->throttle_trigger;
+ }
+ }
- if (querymode == QUERY_SIMPLE)
- {
- char *sql;
+ st->state = CSTATE_THROTTLE;
+ if (debug)
+ fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
+ st->id, wait);
+ break;
- sql = pg_strdup(command->argv[0]);
- sql = assignVariables(st, sql);
+ /*
+ * Wait until it's time to start next transaction.
+ */
+ case CSTATE_THROTTLE:
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+ return; /* Still sleeping, nothing to do here */
+
+ /* Else done sleeping, start the transaction */
+ st->state = CSTATE_START_TX;
+ break;
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, sql);
- r = PQsendQuery(st->con, sql);
- free(sql);
- }
- else if (querymode == QUERY_EXTENDED)
- {
- const char *sql = command->argv[0];
- const char *params[MAX_ARGS];
+ /* Start new transaction */
+ case CSTATE_START_TX:
- getQueryParams(st, command, params);
+ /*
+ * Establish connection on first call, or if is_connect is
+ * true.
+ */
+ if (st->con == NULL)
+ {
+ instr_time start;
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, sql);
- r = PQsendQueryParams(st->con, sql, command->argc - 1,
- NULL, params, NULL, NULL, 0);
- }
- else if (querymode == QUERY_PREPARED)
- {
- char name[MAX_PREPARE_NAME];
- const char *params[MAX_ARGS];
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ start = now;
+ if ((st->con = doConnect()) == NULL)
+ {
+ fprintf(stderr, "client %d aborted while establishing connection\n",
+ st->id);
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
- if (!st->prepared[st->use_file])
- {
- int j;
+ /* Reset session-local state */
+ memset(st->prepared, 0, sizeof(st->prepared));
+ }
- for (j = 0; commands[j] != NULL; j++)
+ /*
+ * Record transaction start time under logging, progress or
+ * throttling.
+ */
+ if (use_log || progress || throttle_delay || latency_limit ||
+ per_script_stats)
{
- PGresult *res;
- char name[MAX_PREPARE_NAME];
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->txn_begin = now;
+
+ /*
+ * When not throttling, this is also the transaction's
+ * scheduled start time.
+ */
+ if (!throttle_delay)
+ st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+ }
- if (commands[j]->type != SQL_COMMAND)
- continue;
- preparedStatementName(name, st->use_file, j);
- res = PQprepare(st->con, name,
- commands[j]->argv[0], commands[j]->argc - 1, NULL);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- fprintf(stderr, "%s", PQerrorMessage(st->con));
- PQclear(res);
+ /* Begin with the first command */
+ st->command = 0;
+ st->state = CSTATE_START_COMMAND;
+ break;
+
+ /*
+ * Send a command to server (or execute a meta-command)
+ */
+ case CSTATE_START_COMMAND:
+ command = sql_script[st->use_file].commands[st->command];
+
+ /*
+ * If we reached the end of the script, move to end-of-xact
+ * processing.
+ */
+ if (command == NULL)
+ {
+ st->state = CSTATE_END_TX;
+ break;
}
- st->prepared[st->use_file] = true;
- }
- getQueryParams(st, command, params);
- preparedStatementName(name, st->use_file, st->state);
+ /*
+ * Record statement start time if per-command latencies are
+ * requested
+ */
+ if (is_latencies)
+ {
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->stmt_begin = now;
+ }
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, name);
- r = PQsendQueryPrepared(st->con, name, command->argc - 1,
- params, NULL, NULL, 0);
- }
- else /* unknown sql mode */
- r = 0;
+ if (command->type == SQL_COMMAND)
+ {
+ if (!sendCommand(st, command))
+ {
+ /*
+ * Failed. Stay in CSTATE_START_COMMAND state, to
+ * retry. ??? What the point or retrying? Should
+ * rather abort?
+ */
+ return;
+ }
+ else
+ st->state = CSTATE_WAIT_RESULT;
+ }
+ else if (command->type == META_COMMAND)
+ {
+ int argc = command->argc,
+ i;
+ char **argv = command->argv;
- if (r == 0)
- {
- if (debug)
- fprintf(stderr, "client %d could not send %s\n",
- st->id, command->argv[0]);
- st->ecnt++;
- }
- else
- st->listen = true; /* flags that should be listened */
- }
- else if (commands[st->state]->type == META_COMMAND)
- {
- int argc = commands[st->state]->argc,
- i;
- char **argv = commands[st->state]->argv;
+ if (debug)
+ {
+ fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
+ for (i = 1; i < argc; i++)
+ fprintf(stderr, " %s", argv[i]);
+ fprintf(stderr, "\n");
+ }
- if (debug)
- {
- fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
- for (i = 1; i < argc; i++)
- fprintf(stderr, " %s", argv[i]);
- fprintf(stderr, "\n");
- }
+ if (pg_strcasecmp(argv[0], "sleep") == 0)
+ {
+ /*
+ * A \sleep doesn't execute anything, we just get the
+ * delay from the argument, and enter the CSTATE_SLEEP
+ * state. (The per-command latency will be recorded
+ * in CSTATE_SLEEP state, not here, after the delay
+ * has elapsed.)
+ */
+ int usec;
+
+ if (!evaluateSleep(st, argc, argv, &usec))
+ {
+ commandFailed(st, "execution of meta-command 'sleep' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
- if (pg_strcasecmp(argv[0], "set") == 0)
- {
- PgBenchExpr *expr = commands[st->state]->expr;
- PgBenchValue result;
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
+ st->state = CSTATE_SLEEP;
+ break;
+ }
+ else
+ {
+ if (pg_strcasecmp(argv[0], "set") == 0)
+ {
+ PgBenchExpr *expr = command->expr;
+ PgBenchValue result;
- if (!evaluateExpr(thread, st, expr, &result))
- {
- st->ecnt++;
- return true;
- }
+ if (!evaluateExpr(thread, st, expr, &result))
+ {
+ commandFailed(st, "evaluation of meta-command 'set' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
- if (!putVariableNumber(st, argv[0], argv[1], &result))
- {
- st->ecnt++;
- return true;
- }
+ if (!putVariableNumber(st, argv[0], argv[1], &result))
+ {
+ commandFailed(st, "assignment of meta-command 'set' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ }
+ else if (pg_strcasecmp(argv[0], "setshell") == 0)
+ {
+ bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "sleep") == 0)
- {
- char *var;
- int usec;
- instr_time now;
+ if (timer_exceeded) /* timeout */
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+ else if (!ret) /* on error */
+ {
+ commandFailed(st, "execution of meta-command 'setshell' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else
+ {
+ /* succeeded */
+ }
+ }
+ else if (pg_strcasecmp(argv[0], "shell") == 0)
+ {
+ bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
- if (*argv[1] == ':')
- {
- if ((var = getVariable(st, argv[1] + 1)) == NULL)
+ if (timer_exceeded) /* timeout */
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+ else if (!ret) /* on error */
+ {
+ commandFailed(st, "execution of meta-command 'shell' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else
+ {
+ /* succeeded */
+ }
+ }
+
+ /*
+ * executing the expression or shell command might
+ * take a non-negligible amount of time, so reset
+ * 'now'
+ */
+ INSTR_TIME_SET_ZERO(now);
+
+ st->state = CSTATE_END_COMMAND;
+ }
+ }
+ break;
+
+ /*
+ * Wait for the current SQL command to complete
+ */
+ case CSTATE_WAIT_RESULT:
+ command = sql_script[st->use_file].commands[st->command];
+ if (debug)
+ fprintf(stderr, "client %d receiving\n", st->id);
+ if (!PQconsumeInput(st->con))
+ { /* there's something wrong */
+ commandFailed(st, "perhaps the backend died while processing");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ if (PQisBusy(st->con))
+ return; /* don't have the whole result yet */
+
+ /*
+ * Read and discard the query result;
+ */
+ res = PQgetResult(st->con);
+ switch (PQresultStatus(res))
{
- fprintf(stderr, "%s: undefined variable \"%s\"\n",
- argv[0], argv[1]);
- st->ecnt++;
- return true;
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ case PGRES_EMPTY_QUERY:
+ /* OK */
+ PQclear(res);
+ discard_response(st);
+ st->state = CSTATE_END_COMMAND;
+ break;
+ default:
+ commandFailed(st, PQerrorMessage(st->con));
+ PQclear(res);
+ st->state = CSTATE_ABORTED;
+ break;
}
- usec = atoi(var);
- }
- else
- usec = atoi(argv[1]);
+ break;
- if (argc > 2)
- {
- if (pg_strcasecmp(argv[2], "ms") == 0)
- usec *= 1000;
- else if (pg_strcasecmp(argv[2], "s") == 0)
- usec *= 1000000;
- }
- else
- usec *= 1000000;
+ /*
+ * Wait until sleep is done. This state is entered after a
+ * \sleep metacommand. The behavior is similar to
+ * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
+ * instead of CSTATE_START_TX.
+ */
+ case CSTATE_SLEEP:
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+ return; /* Still sleeping, nothing to do here */
+ /* Else done sleeping. */
+ st->state = CSTATE_END_COMMAND;
+ break;
- INSTR_TIME_SET_CURRENT(now);
- st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec;
- st->sleeping = true;
+ /*
+ * End of command: record stats and proceed to next command.
+ */
+ case CSTATE_END_COMMAND:
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "setshell") == 0)
- {
- bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+ /*
+ * command completed: accumulate per-command execution times
+ * in thread-local data structure, if per-command latencies
+ * are requested.
+ */
+ if (is_latencies)
+ {
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
- if (timer_exceeded) /* timeout */
- return clientDone(st);
- else if (!ret) /* on error */
- {
- st->ecnt++;
- return true;
- }
- else /* succeeded */
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "shell") == 0)
- {
- bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+ /* XXX could use a mutex here, but we choose not to */
+ command = sql_script[st->use_file].commands[st->command];
+ addToSimpleStats(&command->stats,
+ INSTR_TIME_GET_DOUBLE(now) -
+ INSTR_TIME_GET_DOUBLE(st->stmt_begin));
+ }
- if (timer_exceeded) /* timeout */
- return clientDone(st);
- else if (!ret) /* on error */
- {
- st->ecnt++;
- return true;
- }
- else /* succeeded */
- st->listen = true;
- }
+ /* Go ahead with next command */
+ st->command++;
+ st->state = CSTATE_START_COMMAND;
+ break;
- /* after a meta command, immediately proceed with next command */
- goto top;
- }
+ /*
+ * End of transaction.
+ */
+ case CSTATE_END_TX:
- return true;
+ /*
+ * transaction finished: calculate latency and log the
+ * transaction
+ */
+ if (progress || throttle_delay || latency_limit ||
+ per_script_stats || use_log)
+ processXactStats(thread, st, &now, false, agg);
+ else
+ thread->stats.cnt++;
+
+ if (is_connect)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ INSTR_TIME_SET_ZERO(now);
+ }
+
+ ++st->cnt;
+ if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
+ {
+ /* exit success */
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+
+ /*
+ * No transaction is underway anymore.
+ */
+ st->state = CSTATE_CHOOSE_SCRIPT;
+
+ /*
+ * If we paced through all commands in the script in this
+ * loop, without returning to the caller even once, do it now.
+ * This gives the thread a chance to process other
+ * connections, and to do progress reporting. This can
+ * currently only happen if the script consists entirely of
+ * meta-commands.
+ */
+ if (end_tx_processed)
+ return;
+ else
+ {
+ end_tx_processed = true;
+ break;
+ }
+
+ /*
+ * Final states. Close the connection if it's still open.
+ */
+ case CSTATE_ABORTED:
+ case CSTATE_FINISHED:
+ if (st->con != NULL)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ }
+ return;
+ }
+ }
}
/*
- * print log entry after completing one transaction.
+ * Print log entry after completing one transaction.
+ *
+ * We print Unix-epoch timestamps in the log, so that entries can be
+ * correlated against other logs. On some platforms this could be obtained
+ * from the instr_time reading the caller has, but rather than get entangled
+ * with that, we just eat the cost of an extra syscall in all cases.
*/
static void
-doLog(TState *thread, CState *st, instr_time *now,
+doLog(TState *thread, CState *st,
StatsData *agg, bool skipped, double latency, double lag)
{
FILE *logfile = thread->logfile;
@@ -2213,15 +2464,17 @@ doLog(TState *thread, CState *st, instr_time *now,
if (agg_interval > 0)
{
/*
- * Loop until we reach the interval of the current transaction, and
- * print all the empty intervals in between (this may happen with very
- * low tps, e.g. --rate=0.1).
+ * Loop until we reach the interval of the current moment, and print
+ * any empty intervals in between (this may happen with very low tps,
+ * e.g. --rate=0.1).
*/
- while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
+ time_t now = time(NULL);
+
+ while (agg->start_time + agg_interval <= now)
{
/* print aggregated report to logfile */
fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
- agg->start_time,
+ (long) agg->start_time,
agg->cnt,
agg->latency.sum,
agg->latency.sum2,
@@ -2249,27 +2502,17 @@ doLog(TState *thread, CState *st, instr_time *now,
else
{
/* no, print raw transactions */
-#ifndef WIN32
+ struct timeval tv;
- /* This is more than we really ought to know about instr_time */
+ gettimeofday(&tv, NULL);
if (skipped)
fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
st->id, st->cnt, st->use_file,
- (long) now->tv_sec, (long) now->tv_usec);
+ (long) tv.tv_sec, (long) tv.tv_usec);
else
fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
st->id, st->cnt, latency, st->use_file,
- (long) now->tv_sec, (long) now->tv_usec);
-#else
-
- /* On Windows, instr_time doesn't provide a timestamp anyway */
- if (skipped)
- fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0",
- st->id, st->cnt, st->use_file);
- else
- fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0",
- st->id, st->cnt, latency, st->use_file);
-#endif
+ (long) tv.tv_sec, (long) tv.tv_usec);
if (throttle_delay)
fprintf(logfile, " %.0f", lag);
fputc('\n', logfile);
@@ -2288,7 +2531,7 @@ processXactStats(TState *thread, CState *st, instr_time *now,
double latency = 0.0,
lag = 0.0;
- if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now))
+ if ((!skipped) && INSTR_TIME_IS_ZERO(*now))
INSTR_TIME_SET_CURRENT(*now);
if (!skipped)
@@ -2310,7 +2553,7 @@ processXactStats(TState *thread, CState *st, instr_time *now,
thread->stats.cnt++;
if (use_log)
- doLog(thread, st, now, agg, skipped, latency, lag);
+ doLog(thread, st, agg, skipped, latency, lag);
/* XXX could use a mutex here, but we choose not to */
if (per_script_stats)
@@ -3032,7 +3275,7 @@ ParseScript(const char *script, const char *desc, int weight)
ps.desc = desc;
ps.weight = weight;
ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
- initStats(&ps.stats, 0.0);
+ initStats(&ps.stats, 0);
/* Prepare to parse script */
sstate = psql_scan_create(&pgbench_callbacks);
@@ -3326,6 +3569,7 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
tps_exclude = total->cnt / (time_include -
(INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
+ /* Report test parameters. */
printf("transaction type: %s\n",
num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
printf("scaling factor: %d\n", scale);
@@ -3362,9 +3606,11 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
if (throttle_delay || progress || latency_limit)
printSimpleStats("latency", &total->latency);
else
- /* only an average latency computed from the duration is available */
- printf("latency average: %.3f ms\n",
- 1000.0 * duration * nclients / total->cnt);
+ {
+ /* no measurement, show average latency computed from run time */
+ printf("latency average = %.3f ms\n",
+ 1000.0 * time_include * nclients / total->cnt);
+ }
if (throttle_delay)
{
@@ -3390,7 +3636,7 @@ printResults(TState *threads, StatsData *total, instr_time total_time,
{
if (num_scripts > 1)
printf("SQL script %d: %s\n"
- " - weight = %d (targets %.1f%% of total)\n"
+ " - weight: %d (targets %.1f%% of total)\n"
" - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
i + 1, sql_script[i].desc,
sql_script[i].weight,
@@ -3435,7 +3681,7 @@ main(int argc, char **argv)
{
static struct option long_options[] = {
/* systematic long/short named options */
- {"tpc-b", no_argument, NULL, 'b'},
+ {"builtin", required_argument, NULL, 'b'},
{"client", required_argument, NULL, 'c'},
{"connect", no_argument, NULL, 'C'},
{"debug", no_argument, NULL, 'd'},
@@ -3469,6 +3715,7 @@ main(int argc, char **argv)
{"sampling-rate", required_argument, NULL, 4},
{"aggregate-interval", required_argument, NULL, 5},
{"progress-timestamp", no_argument, NULL, 6},
+ {"log-prefix", required_argument, NULL, 7},
{NULL, 0, NULL, 0}
};
@@ -3807,10 +4054,6 @@ main(int argc, char **argv)
}
break;
case 5:
-#ifdef WIN32
- fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n");
- exit(1);
-#else
benchmarking_option_set = true;
agg_interval = atoi(optarg);
if (agg_interval <= 0)
@@ -3819,12 +4062,15 @@ main(int argc, char **argv)
optarg);
exit(1);
}
-#endif
break;
case 6:
progress_timestamp = true;
benchmarking_option_set = true;
break;
+ case 7:
+ benchmarking_option_set = true;
+ logfile_prefix = pg_strdup(optarg);
+ break;
default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
@@ -3922,6 +4168,12 @@ main(int argc, char **argv)
exit(1);
}
+ if (!use_log && logfile_prefix)
+ {
+ fprintf(stderr, "log file prefix (--log-prefix) is allowed only when logging transactions (-l)\n");
+ exit(1);
+ }
+
if (duration > 0 && agg_interval > duration)
{
fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration);
@@ -4092,7 +4344,7 @@ main(int argc, char **argv)
thread->random_state[2] = random();
thread->logfile = NULL; /* filled in later */
thread->latency_late = 0;
- initStats(&thread->stats, 0.0);
+ initStats(&thread->stats, 0);
nclients_dealt += thread->nstate;
}
@@ -4146,7 +4398,7 @@ main(int argc, char **argv)
#endif /* ENABLE_THREAD_SAFETY */
/* wait for threads and accumulate results */
- initStats(&stats, 0.0);
+ initStats(&stats, 0);
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
@@ -4219,15 +4471,20 @@ threadRun(void *arg)
INSTR_TIME_SET_ZERO(thread->conn_time);
+ initStats(&aggs, time(NULL));
+ last = aggs;
+
/* open log file if requested */
if (use_log)
{
- char logpath[64];
+ char logpath[MAXPGPATH];
+ char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
if (thread->tid == 0)
- snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
+ snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
else
- snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid);
+ snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
+
thread->logfile = fopen(logpath, "w");
if (thread->logfile == NULL)
@@ -4252,101 +4509,85 @@ threadRun(void *arg)
INSTR_TIME_SET_CURRENT(thread->conn_time);
INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
- initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
- last = aggs;
-
- /* send start up queries in async manner */
+ /* explicitly initialize the state machines */
for (i = 0; i < nstate; i++)
{
- CState *st = &state[i];
- int prev_ecnt = st->ecnt;
- Command **commands;
-
- st->use_file = chooseScript(thread);
- commands = sql_script[st->use_file].commands;
- if (debug)
- fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
- sql_script[st->use_file].desc);
- if (!doCustom(thread, st, &aggs))
- remains--; /* I've aborted */
-
- if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
- {
- fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
- i, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- }
+ state[i].state = CSTATE_CHOOSE_SCRIPT;
}
+ /* loop till all clients have terminated */
while (remains > 0)
{
fd_set input_mask;
- int maxsock; /* max socket number to be waited */
- int64 now_usec = 0;
+ int maxsock; /* max socket number to be waited for */
int64 min_usec;
+ int64 now_usec = 0; /* set this only if needed */
+ /* identify which client sockets should be checked for input */
FD_ZERO(&input_mask);
-
maxsock = -1;
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
- int sock;
- if (st->con == NULL)
+ if (st->state == CSTATE_THROTTLE && timer_exceeded)
{
- continue;
+ /* interrupt client that has not started a transaction */
+ st->state = CSTATE_FINISHED;
+ PQfinish(st->con);
+ st->con = NULL;
+ remains--;
}
- else if (st->sleeping)
+ else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
- if (st->throttling && timer_exceeded)
- {
- /* interrupt client which has not started a transaction */
- remains--;
- st->sleeping = false;
- st->throttling = false;
- PQfinish(st->con);
- st->con = NULL;
- continue;
- }
- else /* just a nap from the script */
+ /* a nap from the script, or under throttling */
+ int64 this_usec;
+
+ /* get current time if needed */
+ if (now_usec == 0)
{
- int this_usec;
+ instr_time now;
- if (min_usec == PG_INT64_MAX)
- {
- instr_time now;
+ INSTR_TIME_SET_CURRENT(now);
+ now_usec = INSTR_TIME_GET_MICROSEC(now);
+ }
- INSTR_TIME_SET_CURRENT(now);
- now_usec = INSTR_TIME_GET_MICROSEC(now);
- }
+ /* min_usec should be the minimum delay across all clients */
+ this_usec = (st->state == CSTATE_SLEEP ?
+ st->sleep_until : st->txn_scheduled) - now_usec;
+ if (min_usec > this_usec)
+ min_usec = this_usec;
+ }
+ else if (st->state == CSTATE_WAIT_RESULT)
+ {
+ /*
+ * waiting for result from server - nothing to do unless the
+ * socket is readable
+ */
+ int sock = PQsocket(st->con);
- this_usec = st->txn_scheduled - now_usec;
- if (min_usec > this_usec)
- min_usec = this_usec;
+ if (sock < 0)
+ {
+ fprintf(stderr, "invalid socket: %s",
+ PQerrorMessage(st->con));
+ goto done;
}
+
+ FD_SET(sock, &input_mask);
+ if (maxsock < sock)
+ maxsock = sock;
}
- else if (commands[st->state]->type == META_COMMAND)
+ else if (st->state != CSTATE_ABORTED &&
+ st->state != CSTATE_FINISHED)
{
- min_usec = 0; /* the connection is ready to run */
+ /*
+ * This client thread is ready to do something, so we don't
+ * want to wait. No need to examine additional clients.
+ */
+ min_usec = 0;
break;
}
-
- sock = PQsocket(st->con);
- if (sock < 0)
- {
- fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
- goto done;
- }
-
- FD_SET(sock, &input_mask);
-
- if (maxsock < sock)
- maxsock = sock;
}
/* also wake up to print the next progress report on time */
@@ -4368,9 +4609,10 @@ threadRun(void *arg)
}
/*
- * Sleep until we receive data from the server, or a nap-time
- * specified in the script ends, or it's time to print a progress
- * report.
+ * If no clients are ready to execute actions, sleep until we receive
+ * data from the server, or a nap-time specified in the script ends,
+ * or it's time to print a progress report. Update input_mask to show
+ * which client(s) received data.
*/
if (min_usec > 0 && maxsock != -1)
{
@@ -4389,22 +4631,29 @@ threadRun(void *arg)
if (nsocks < 0)
{
if (errno == EINTR)
+ {
+ /* On EINTR, go back to top of loop */
continue;
+ }
/* must be something wrong */
fprintf(stderr, "select() failed: %s\n", strerror(errno));
goto done;
}
}
+ else
+ {
+ /* If we didn't call select(), don't try to read any data */
+ FD_ZERO(&input_mask);
+ }
- /* ok, backend returns reply */
+ /* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
- int prev_ecnt = st->ecnt;
- if (st->con)
+ if (st->state == CSTATE_WAIT_RESULT)
{
+ /* don't call doCustom unless data is available */
int sock = PQsocket(st->con);
if (sock < 0)
@@ -4413,25 +4662,25 @@ threadRun(void *arg)
PQerrorMessage(st->con));
goto done;
}
- if (FD_ISSET(sock, &input_mask) ||
- commands[st->state]->type == META_COMMAND)
- {
- if (!doCustom(thread, st, &aggs))
- remains--; /* I've aborted */
- }
- }
- if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
+ if (!FD_ISSET(sock, &input_mask))
+ continue;
+ }
+ else if (st->state == CSTATE_FINISHED ||
+ st->state == CSTATE_ABORTED)
{
- fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
- i, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
+ /* this client is done, no need to consider it anymore */
+ continue;
}
+
+ doCustom(thread, st, &aggs);
+
+ /* If doCustom changed client to finished state, reduce remains */
+ if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ remains--;
}
- /* progress report by thread 0 for all threads */
+ /* progress report is made by thread 0 for all threads */
if (progress && thread->tid == 0)
{
instr_time now_time;
@@ -4463,7 +4712,7 @@ threadRun(void *arg)
* (If a read from a 64-bit integer is not atomic, you might
* get a "torn" read and completely bogus latencies though!)
*/
- initStats(&cur, 0.0);
+ initStats(&cur, 0);
for (i = 0; i < nthreads; i++)
{
mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
@@ -4483,10 +4732,21 @@ threadRun(void *arg)
(cur.cnt - last.cnt);
if (progress_timestamp)
- sprintf(tbuf, "%.03f s",
- INSTR_TIME_GET_MILLISEC(now_time) / 1000.0);
+ {
+ /*
+ * On some platforms the current system timestamp is
+ * available in now_time, but rather than get entangled
+ * with that, we just eat the cost of an extra syscall in
+ * all cases.
+ */
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
+ (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
+ }
else
- sprintf(tbuf, "%.1f s", total_run);
+ snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
fprintf(stderr,
"progress: %s, %.1f tps, lat %.3f ms stddev %.3f",
@@ -4523,12 +4783,13 @@ done:
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
if (thread->logfile)
{
- if (agg_interval)
+ if (agg_interval > 0)
{
/* log aggregated but not yet reported transactions */
- doLog(thread, state, &end, &aggs, false, 0, 0);
+ doLog(thread, state, &aggs, false, 0, 0);
}
fclose(thread->logfile);
+ thread->logfile = NULL;
}
return NULL;
}
diff --git a/src/bin/pgbench/pgbench.h b/src/bin/pgbench/pgbench.h
index ab0f822010..38b3af5ab1 100644
--- a/src/bin/pgbench/pgbench.h
+++ b/src/bin/pgbench/pgbench.h
@@ -2,7 +2,7 @@
*
* pgbench.h
*
- * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------