diff options
| author | Pavan Deolasee | 2016-10-27 15:02:55 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2016-10-27 15:02:55 +0000 |
| commit | c52792488cd87e67e62ec61f5b56f461900353b4 (patch) | |
| tree | 02b4a719f979659de8f73fce6c1ca65cef2e323f /src/bin/pgbench | |
| parent | 891e6be57e5580b54a9df9fd42cb9bd10d0e7b21 (diff) | |
| parent | b5bce6c1ec6061c8a4f730d927e162db7e2ce365 (diff) | |
Merge commit 'b5bce6c1ec6061c8a4f730d927e162db7e2ce365'
Diffstat (limited to 'src/bin/pgbench')
| -rw-r--r-- | src/bin/pgbench/.gitignore | 1 | ||||
| -rw-r--r-- | src/bin/pgbench/Makefile | 10 | ||||
| -rw-r--r-- | src/bin/pgbench/exprparse.y | 251 | ||||
| -rw-r--r-- | src/bin/pgbench/exprscan.l | 375 | ||||
| -rw-r--r-- | src/bin/pgbench/pgbench.c | 3291 | ||||
| -rw-r--r-- | src/bin/pgbench/pgbench.h | 122 | ||||
| -rw-r--r-- | src/bin/pgbench/t/001_pgbench.pl | 25 |
7 files changed, 2577 insertions, 1498 deletions
diff --git a/src/bin/pgbench/.gitignore b/src/bin/pgbench/.gitignore index aae819ed70..983a3cd7a6 100644 --- a/src/bin/pgbench/.gitignore +++ b/src/bin/pgbench/.gitignore @@ -1,3 +1,4 @@ /exprparse.c /exprscan.c /pgbench +/tmp_check/ diff --git a/src/bin/pgbench/Makefile b/src/bin/pgbench/Makefile index 18fdf58d13..1503d00e12 100644 --- a/src/bin/pgbench/Makefile +++ b/src/bin/pgbench/Makefile @@ -10,6 +10,7 @@ include $(top_builddir)/src/Makefile.global OBJS = pgbench.o exprparse.o $(WIN32RES) override CPPFLAGS := -I. -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) +LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq ifneq ($(PORTNAME), win32) override CFLAGS += $(PTHREAD_CFLAGS) @@ -18,7 +19,7 @@ endif all: pgbench -pgbench: $(OBJS) | submake-libpq submake-libpgport +pgbench: $(OBJS) | submake-libpq submake-libpgport submake-libpgfeutils $(CC) $(CFLAGS) $^ $(libpq_pgport) $(PTHREAD_LIBS) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) # exprscan is compiled as part of exprparse @@ -37,6 +38,13 @@ uninstall: clean distclean: rm -f pgbench$(X) $(OBJS) + rm -rf tmp_check maintainer-clean: distclean rm -f exprparse.c exprscan.c + +check: + $(prove_check) + +installcheck: + $(prove_installcheck) diff --git a/src/bin/pgbench/exprparse.y b/src/bin/pgbench/exprparse.y index e68631e3e6..0cc665b75b 100644 --- a/src/bin/pgbench/exprparse.y +++ b/src/bin/pgbench/exprparse.y @@ -4,9 +4,11 @@ * exprparse.y * bison grammar for a simple expression syntax * - * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * + * src/bin/pgbench/exprparse.y + * *------------------------------------------------------------------------- */ @@ -16,29 +18,40 @@ PgBenchExpr *expr_parse_result; +static PgBenchExprList *make_elist(PgBenchExpr *exp, PgBenchExprList *list); static PgBenchExpr *make_integer_constant(int64 ival); +static PgBenchExpr *make_double_constant(double dval); static PgBenchExpr *make_variable(char *varname); -static PgBenchExpr *make_op(char operator, PgBenchExpr *lexpr, - PgBenchExpr *rexpr); +static PgBenchExpr *make_op(yyscan_t yyscanner, const char *operator, + PgBenchExpr *lexpr, PgBenchExpr *rexpr); +static int find_func(yyscan_t yyscanner, const char *fname); +static PgBenchExpr *make_func(yyscan_t yyscanner, int fnumber, PgBenchExprList *args); %} +%pure-parser %expect 0 %name-prefix="expr_yy" +%parse-param {yyscan_t yyscanner} +%lex-param {yyscan_t yyscanner} + %union { int64 ival; + double dval; char *str; PgBenchExpr *expr; + PgBenchExprList *elist; } +%type <elist> elist %type <expr> expr -%type <ival> INTEGER -%type <str> VARIABLE +%type <ival> INTEGER_CONST function +%type <dval> DOUBLE_CONST +%type <str> VARIABLE FUNCTION -%token INTEGER VARIABLE -%token CHAR_ERROR /* never used, will raise a syntax error */ +%token INTEGER_CONST DOUBLE_CONST VARIABLE FUNCTION /* Precedence: lowest to highest */ %left '+' '-' @@ -49,16 +62,27 @@ static PgBenchExpr *make_op(char operator, PgBenchExpr *lexpr, result: expr { expr_parse_result = $1; } +elist: { $$ = NULL; } + | expr { $$ = make_elist($1, NULL); } + | elist ',' expr { $$ = make_elist($3, $1); } + ; + expr: '(' expr ')' { $$ = $2; } | '+' expr %prec UMINUS { $$ = $2; } - | '-' expr %prec UMINUS { $$ = make_op('-', make_integer_constant(0), $2); } - | expr '+' expr { $$ = make_op('+', $1, $3); } - | expr '-' expr { $$ = make_op('-', $1, $3); } - | expr '*' expr { $$ = make_op('*', $1, $3); } - | expr '/' expr { $$ = make_op('/', $1, $3); } - | expr '%' expr { $$ = make_op('%', $1, $3); } - | INTEGER { $$ = make_integer_constant($1); } + | '-' expr %prec UMINUS { $$ = make_op(yyscanner, "-", + make_integer_constant(0), $2); } + | expr '+' expr { $$ = make_op(yyscanner, "+", $1, $3); } + | expr '-' expr { $$ = make_op(yyscanner, "-", $1, $3); } + | expr '*' expr { $$ = make_op(yyscanner, "*", $1, $3); } + | expr '/' expr { $$ = make_op(yyscanner, "/", $1, $3); } + | expr '%' expr { $$ = make_op(yyscanner, "%", $1, $3); } + | INTEGER_CONST { $$ = make_integer_constant($1); } + | DOUBLE_CONST { $$ = make_double_constant($1); } | VARIABLE { $$ = make_variable($1); } + | function '(' elist ')' { $$ = make_func(yyscanner, $1, $3); } + ; + +function: FUNCTION { $$ = find_func(yyscanner, $1); pg_free($1); } ; %% @@ -68,8 +92,20 @@ make_integer_constant(int64 ival) { PgBenchExpr *expr = pg_malloc(sizeof(PgBenchExpr)); - expr->etype = ENODE_INTEGER_CONSTANT; - expr->u.integer_constant.ival = ival; + expr->etype = ENODE_CONSTANT; + expr->u.constant.type = PGBT_INT; + expr->u.constant.u.ival = ival; + return expr; +} + +static PgBenchExpr * +make_double_constant(double dval) +{ + PgBenchExpr *expr = pg_malloc(sizeof(PgBenchExpr)); + + expr->etype = ENODE_CONSTANT; + expr->u.constant.type = PGBT_DOUBLE; + expr->u.constant.u.dval = dval; return expr; } @@ -84,15 +120,188 @@ make_variable(char *varname) } static PgBenchExpr * -make_op(char operator, PgBenchExpr *lexpr, PgBenchExpr *rexpr) +make_op(yyscan_t yyscanner, const char *operator, + PgBenchExpr *lexpr, PgBenchExpr *rexpr) +{ + return make_func(yyscanner, find_func(yyscanner, operator), + make_elist(rexpr, make_elist(lexpr, NULL))); +} + +/* + * List of available functions: + * - fname: function name + * - nargs: number of arguments + * -1 is a special value for least & greatest meaning #args >= 1 + * - tag: function identifier from PgBenchFunction enum + */ +static const struct +{ + const char *fname; + int nargs; + PgBenchFunction tag; +} PGBENCH_FUNCTIONS[] = +{ + /* parsed as operators, executed as functions */ + { + "+", 2, PGBENCH_ADD + }, + { + "-", 2, PGBENCH_SUB + }, + { + "*", 2, PGBENCH_MUL + }, + { + "/", 2, PGBENCH_DIV + }, + { + "%", 2, PGBENCH_MOD + }, + /* actual functions */ + { + "abs", 1, PGBENCH_ABS + }, + { + "least", -1, PGBENCH_LEAST + }, + { + "greatest", -1, PGBENCH_GREATEST + }, + { + "debug", 1, PGBENCH_DEBUG + }, + { + "pi", 0, PGBENCH_PI + }, + { + "sqrt", 1, PGBENCH_SQRT + }, + { + "int", 1, PGBENCH_INT + }, + { + "double", 1, PGBENCH_DOUBLE + }, + { + "random", 2, PGBENCH_RANDOM + }, + { + "random_gaussian", 3, PGBENCH_RANDOM_GAUSSIAN + }, + { + "random_exponential", 3, PGBENCH_RANDOM_EXPONENTIAL + }, + /* keep as last array element */ + { + NULL, 0, 0 + } +}; + +/* + * Find a function from its name + * + * return the index of the function from the PGBENCH_FUNCTIONS array + * or fail if the function is unknown. + */ +static int +find_func(yyscan_t yyscanner, const char *fname) +{ + int i = 0; + + while (PGBENCH_FUNCTIONS[i].fname) + { + if (pg_strcasecmp(fname, PGBENCH_FUNCTIONS[i].fname) == 0) + return i; + i++; + } + + expr_yyerror_more(yyscanner, "unexpected function name", fname); + + /* not reached */ + return -1; +} + +/* Expression linked list builder */ +static PgBenchExprList * +make_elist(PgBenchExpr *expr, PgBenchExprList *list) +{ + PgBenchExprLink *cons; + + if (list == NULL) + { + list = pg_malloc(sizeof(PgBenchExprList)); + list->head = NULL; + list->tail = NULL; + } + + cons = pg_malloc(sizeof(PgBenchExprLink)); + cons->expr = expr; + cons->next = NULL; + + if (list->head == NULL) + list->head = cons; + else + list->tail->next = cons; + + list->tail = cons; + + return list; +} + +/* Return the length of an expression list */ +static int +elist_length(PgBenchExprList *list) +{ + PgBenchExprLink *link = list != NULL ? list->head : NULL; + int len = 0; + + for (; link != NULL; link = link->next) + len++; + + return len; +} + +/* Build function call expression */ +static PgBenchExpr * +make_func(yyscan_t yyscanner, int fnumber, PgBenchExprList *args) { PgBenchExpr *expr = pg_malloc(sizeof(PgBenchExpr)); - expr->etype = ENODE_OPERATOR; - expr->u.operator.operator = operator; - expr->u.operator.lexpr = lexpr; - expr->u.operator.rexpr = rexpr; + Assert(fnumber >= 0); + + if (PGBENCH_FUNCTIONS[fnumber].nargs >= 0 && + PGBENCH_FUNCTIONS[fnumber].nargs != elist_length(args)) + expr_yyerror_more(yyscanner, "unexpected number of arguments", + PGBENCH_FUNCTIONS[fnumber].fname); + + /* check at least one arg for least & greatest */ + if (PGBENCH_FUNCTIONS[fnumber].nargs == -1 && + elist_length(args) == 0) + expr_yyerror_more(yyscanner, "at least one argument expected", + PGBENCH_FUNCTIONS[fnumber].fname); + + expr->etype = ENODE_FUNCTION; + expr->u.function.function = PGBENCH_FUNCTIONS[fnumber].tag; + + /* only the link is used, the head/tail is not useful anymore */ + expr->u.function.args = args != NULL ? args->head : NULL; + if (args) + pg_free(args); + return expr; } +/* + * exprscan.l is compiled as part of exprparse.y. Currently, this is + * unavoidable because exprparse does not create a .h file to export + * its token symbols. If these files ever grow large enough to be + * worth compiling separately, that could be fixed; but for now it + * seems like useless complication. + */ + +/* First, get rid of "#define yyscan_t" from pgbench.h */ +#undef yyscan_t +/* ... and the yylval macro, which flex will have its own definition for */ +#undef yylval + #include "exprscan.c" diff --git a/src/bin/pgbench/exprscan.l b/src/bin/pgbench/exprscan.l index 5331ab778b..20891a3b22 100644 --- a/src/bin/pgbench/exprscan.l +++ b/src/bin/pgbench/exprscan.l @@ -2,30 +2,52 @@ /*------------------------------------------------------------------------- * * exprscan.l - * a lexical scanner for a simple expression syntax + * lexical scanner for pgbench backslash commands * - * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * This lexer supports two operating modes: + * + * In INITIAL state, just parse off whitespace-separated words (this mode + * is basically equivalent to strtok(), which is what we used to use). + * + * In EXPR state, lex for the simple expression syntax of exprparse.y. + * + * In either mode, stop upon hitting newline or end of string. + * + * Note that this lexer operates within the framework created by psqlscan.l, + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * + * src/bin/pgbench/exprscan.l + * *------------------------------------------------------------------------- */ -/* line and column number for error reporting */ -static int yyline = 0, yycol = 0; +#include "fe_utils/psqlscan_int.h" -/* Handles to the buffer that the lexer uses internally */ -static YY_BUFFER_STATE scanbufhandle; -static char *scanbuf; -static int scanbuflen; +/* context information for reporting errors in expressions */ +static const char *expr_source = NULL; +static int expr_lineno = 0; +static int expr_start_offset = 0; +static const char *expr_command = NULL; + +/* indicates whether last yylex() call read a newline */ +static bool last_was_newline = false; + +/* + * Work around a bug in flex 2.5.35: it emits a couple of functions that + * it forgets to emit declarations for. Since we use -Wmissing-prototypes, + * this would cause warnings. Providing our own declarations should be + * harmless even when the bug gets fixed. + */ +extern int expr_yyget_column(yyscan_t yyscanner); +extern void expr_yyset_column(int column_no, yyscan_t yyscanner); -/* context information for error reporting */ -static char *expr_source = NULL; -static int expr_lineno = 0; -static char *expr_full_line = NULL; -static char *expr_command = NULL; -static int expr_col = 0; %} +/* Except for the prefix, these options should match psqlscan.l */ +%option reentrant +%option bison-bridge %option 8bit %option never-interactive %option nodefault @@ -35,95 +57,306 @@ static int expr_col = 0; %option warn %option prefix="expr_yy" -space [ \t\r\f] +/* Character classes */ +alpha [a-zA-Z_] +digit [0-9] +alnum [a-zA-Z0-9_] +/* {space} + {nonspace} + {newline} should cover all characters */ +space [ \t\r\f\v] +nonspace [^ \t\r\f\v\n] +newline [\n] + +/* Exclusive states */ +%x EXPR %% -"+" { yycol += yyleng; return '+'; } -"-" { yycol += yyleng; return '-'; } -"*" { yycol += yyleng; return '*'; } -"/" { yycol += yyleng; return '/'; } -"%" { yycol += yyleng; return '%'; } -"(" { yycol += yyleng; return '('; } -")" { yycol += yyleng; return ')'; } - -:[a-zA-Z0-9_]+ { - yycol += yyleng; - yylval.str = pg_strdup(yytext + 1); +%{ + /* Declare some local variables inside yylex(), for convenience */ + PsqlScanState cur_state = yyextra; + + /* + * Force flex into the state indicated by start_state. This has a + * couple of purposes: it lets some of the functions below set a new + * starting state without ugly direct access to flex variables, and it + * allows us to transition from one flex lexer to another so that we + * can lex different parts of the source string using separate lexers. + */ + BEGIN(cur_state->start_state); + + /* Reset was-newline flag */ + last_was_newline = false; +%} + + /* INITIAL state */ + +{nonspace}+ { + /* Found a word, emit and return it */ + psqlscan_emit(cur_state, yytext, yyleng); + return 1; + } + +{space}+ { /* ignore */ } + +{newline} { + /* report end of command */ + last_was_newline = true; + return 0; + } + + /* EXPR state */ + +<EXPR>{ + +"+" { return '+'; } +"-" { return '-'; } +"*" { return '*'; } +"/" { return '/'; } +"%" { return '%'; } +"(" { return '('; } +")" { return ')'; } +"," { return ','; } + +:{alnum}+ { + yylval->str = pg_strdup(yytext + 1); return VARIABLE; } -[0-9]+ { - yycol += yyleng; - yylval.ival = strtoint64(yytext); - return INTEGER; +{digit}+ { + yylval->ival = strtoint64(yytext); + return INTEGER_CONST; + } +{digit}+(\.{digit}*)?([eE][-+]?{digit}+)? { + yylval->dval = atof(yytext); + return DOUBLE_CONST; + } +\.{digit}+([eE][-+]?{digit}+)? { + yylval->dval = atof(yytext); + return DOUBLE_CONST; + } +{alpha}{alnum}* { + yylval->str = pg_strdup(yytext); + return FUNCTION; } -[\n] { yycol = 0; yyline++; } -{space}+ { yycol += yyleng; /* ignore */ } +{newline} { + /* report end of command */ + last_was_newline = true; + return 0; + } + +{space}+ { /* ignore */ } . { - yycol += yyleng; - syntax_error(expr_source, expr_lineno, expr_full_line, expr_command, - "unexpected character", yytext, expr_col + yycol); - /* dead code, exit is called from syntax_error */ - return CHAR_ERROR; + /* + * must strdup yytext so that expr_yyerror_more doesn't + * change it while finding end of line + */ + expr_yyerror_more(yyscanner, "unexpected character", + pg_strdup(yytext)); + /* NOTREACHED, syntax_error calls exit() */ + return 0; + } + +} + +<<EOF>> { + if (cur_state->buffer_stack == NULL) + return 0; /* end of input reached */ + + /* + * We were expanding a variable, so pop the inclusion + * stack and keep lexing + */ + psqlscan_pop_buffer_stack(cur_state); + psqlscan_select_top_buffer(cur_state); } + %% void -yyerror(const char *message) +expr_yyerror_more(yyscan_t yyscanner, const char *message, const char *more) +{ + PsqlScanState state = yyget_extra(yyscanner); + int error_detection_offset = expr_scanner_offset(state) - 1; + YYSTYPE lval; + char *full_line; + size_t l; + + /* + * While parsing an expression, we may not have collected the whole line + * yet from the input source. Lex till EOL so we can report whole line. + * (If we're at EOF, it's okay to call yylex() an extra time.) + */ + if (!last_was_newline) + { + while (yylex(&lval, yyscanner)) + /* skip */ ; + } + + full_line = expr_scanner_get_substring(state, + expr_start_offset, + expr_scanner_offset(state)); + /* Trim trailing newline if any */ + l = strlen(full_line); + while (l > 0 && full_line[l - 1] == '\n') + full_line[--l] = '\0'; + + syntax_error(expr_source, expr_lineno, full_line, expr_command, + message, more, error_detection_offset - expr_start_offset); +} + +void +expr_yyerror(yyscan_t yyscanner, const char *message) { - syntax_error(expr_source, expr_lineno, expr_full_line, expr_command, - message, NULL, expr_col + yycol); + expr_yyerror_more(yyscanner, message, NULL); } /* - * Called before any actual parsing is done + * Collect a space-separated word from a backslash command and return it + * in word_buf, along with its starting string offset in *offset. + * Returns true if successful, false if at end of command. */ -void -expr_scanner_init(const char *str, const char *source, - const int lineno, const char *line, - const char *cmd, const int ecol) +bool +expr_lex_one_word(PsqlScanState state, PQExpBuffer word_buf, int *offset) { - Size slen = strlen(str); + int lexresult; + YYSTYPE lval; + + /* Must be scanning already */ + Assert(state->scanbufhandle != NULL); - /* save context informations for error messages */ - expr_source = (char *) source; - expr_lineno = (int) lineno; - expr_full_line = (char *) line; - expr_command = (char *) cmd; - expr_col = (int) ecol; + /* Set current output target */ + state->output_buf = word_buf; + resetPQExpBuffer(word_buf); + + /* Set input source */ + if (state->buffer_stack != NULL) + yy_switch_to_buffer(state->buffer_stack->buf, state->scanner); + else + yy_switch_to_buffer(state->scanbufhandle, state->scanner); + + /* Set start state */ + state->start_state = INITIAL; + + /* And lex. */ + lexresult = yylex(&lval, state->scanner); /* - * Might be left over after error + * Save start offset of word, if any. We could do this more efficiently, + * but for now this seems fine. */ - if (YY_CURRENT_BUFFER) - yy_delete_buffer(YY_CURRENT_BUFFER); + if (lexresult) + *offset = expr_scanner_offset(state) - word_buf->len; + else + *offset = -1; /* - * Make a scan buffer with special termination needed by flex. + * In case the caller returns to using the regular SQL lexer, reselect the + * appropriate initial state. */ - scanbuflen = slen; - scanbuf = pg_malloc(slen + 2); - memcpy(scanbuf, str, slen); - scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR; - scanbufhandle = yy_scan_buffer(scanbuf, slen + 2); + psql_scan_reselect_sql_lexer(state); - BEGIN(INITIAL); + return (bool) lexresult; } +/* + * Prepare to lex an expression via expr_yyparse(). + * + * Returns the yyscan_t that is to be passed to expr_yyparse(). + * (This is just state->scanner, but callers don't need to know that.) + */ +yyscan_t +expr_scanner_init(PsqlScanState state, + const char *source, int lineno, int start_offset, + const char *command) +{ + /* Save error context info */ + expr_source = source; + expr_lineno = lineno; + expr_start_offset = start_offset; + expr_command = command; + + /* Must be scanning already */ + Assert(state->scanbufhandle != NULL); + + /* Set current output target */ + state->output_buf = NULL; + + /* Set input source */ + if (state->buffer_stack != NULL) + yy_switch_to_buffer(state->buffer_stack->buf, state->scanner); + else + yy_switch_to_buffer(state->scanbufhandle, state->scanner); + + /* Set start state */ + state->start_state = EXPR; + + return state->scanner; +} /* - * Called after parsing is done to clean up after seg_scanner_init() + * Finish lexing an expression. */ void -expr_scanner_finish(void) +expr_scanner_finish(yyscan_t yyscanner) { - yy_delete_buffer(scanbufhandle); - pg_free(scanbuf); - expr_source = NULL; - expr_lineno = 0; - expr_full_line = NULL; - expr_command = NULL; - expr_col = 0; + PsqlScanState state = yyget_extra(yyscanner); + + /* + * Reselect appropriate initial state for SQL lexer. + */ + psql_scan_reselect_sql_lexer(state); +} + +/* + * Get offset from start of string to end of current lexer token. + * + * We rely on the knowledge that flex modifies the scan buffer by storing + * a NUL at the end of the current token (yytext). Note that this might + * not work quite right if we were parsing a sub-buffer, but since pgbench + * never invokes that functionality, it doesn't matter. + */ +int +expr_scanner_offset(PsqlScanState state) +{ + return strlen(state->scanbuf); +} + +/* + * Get a malloc'd copy of the lexer input string from start_offset + * to just before end_offset. + */ +char * +expr_scanner_get_substring(PsqlScanState state, + int start_offset, int end_offset) +{ + char *result; + int slen = end_offset - start_offset; + + Assert(slen >= 0); + Assert(end_offset <= strlen(state->scanbuf)); + result = (char *) pg_malloc(slen + 1); + memcpy(result, state->scanbuf + start_offset, slen); + result[slen] = '\0'; + + return result; +} + +/* + * Get the line number associated with the given string offset + * (which must not be past the end of where we've lexed to). + */ +int +expr_scanner_get_lineno(PsqlScanState state, int offset) +{ + int lineno = 1; + const char *p = state->scanbuf; + + while (*p && offset > 0) + { + if (*p == '\n') + lineno++; + p++, offset--; + } + return lineno; } diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 687becdead..28af19e20c 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -5,7 +5,7 @@ * Originally written by Tatsuo Ishii and enhanced by many contributors. * * src/bin/pgbench/pgbench.c - * Copyright (c) 2000-2015, PostgreSQL Global Development Group + * Copyright (c) 2000-2016, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its @@ -38,6 +38,8 @@ #include "portability/instr_time.h" #include <ctype.h> +#include <float.h> +#include <limits.h> #include <math.h> #include <signal.h> #include <sys/time.h> @@ -55,6 +57,8 @@ #include "pgbench.h" +#define ERRCODE_UNDEFINED_TABLE "42P01" + /* * Multi-platform pthread implementations */ @@ -70,20 +74,8 @@ static int pthread_join(pthread_t th, void **thread_return); /* Use platform-dependent pthread capability */ #include <pthread.h> #else -/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */ -#define PTHREAD_FORK_EMULATION -#include <sys/wait.h> - -#define pthread_t pg_pthread_t -#define pthread_attr_t pg_pthread_attr_t -#define pthread_create pg_pthread_create -#define pthread_join pg_pthread_join - -typedef struct fork_pthread *pthread_t; -typedef int pthread_attr_t; - -static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); -static int pthread_join(pthread_t th, void **thread_return); +/* No threads implementation, use none (-j 1) */ +#define pthread_t void * #endif @@ -100,10 +92,11 @@ static int pthread_join(pthread_t th, void **thread_return); #define LOG_STEP_SECONDS 5 /* seconds between log messages */ #define DEFAULT_NXACTS 10 /* default nxacts */ -#define MIN_GAUSSIAN_THRESHOLD 2.0 /* minimum threshold for gauss */ +#define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */ int nxacts = 0; /* number of transactions per client */ int duration = 0; /* duration in seconds */ +int64 end_time = 0; /* when to stop in micro seconds, under -T */ /* * scaling factor. for example, scale = 10 will make 1000000 tuples in @@ -177,11 +170,11 @@ bool use_log; /* log transaction latencies to a file */ bool use_quiet; /* quiet logging onto stderr */ int agg_interval; /* log aggregates instead of individual * transactions */ +bool per_script_stats = false; /* whether to collect stats per script */ int progress = 0; /* thread progress report every this seconds */ -int progress_nclients = 0; /* number of clients for progress - * report */ -int progress_nthreads = 0; /* number of threads for progress - * report */ +bool progress_timestamp = false; /* progress report with Unix time */ +int nclients = 1; /* number of clients */ +int nthreads = 1; /* number of threads */ bool is_connect; /* establish connection for each transaction */ bool is_latencies; /* report per-command latencies */ int main_pid; /* main process id used in log filename */ @@ -192,47 +185,86 @@ char *login = NULL; char *dbName; const char *progname; +#define WSEP '@' /* weight separator */ + volatile bool timer_exceeded = false; /* flag from signal handler */ -/* variable definitions */ +/* + * Variable definitions. If a variable has a string value, "value" is that + * value, is_numeric is false, and num_value is undefined. If the value is + * known to be numeric, is_numeric is true and num_value contains the value + * (in any permitted numeric variant). In this case "value" contains the + * string equivalent of the number, if we've had occasion to compute that, + * or NULL if we haven't. + */ typedef struct { - char *name; /* variable name */ - char *value; /* its value */ + char *name; /* variable's name */ + char *value; /* its value in string form, if known */ + bool is_numeric; /* is numeric value known? */ + PgBenchValue num_value; /* variable's value in numeric form */ } Variable; -#define MAX_FILES 128 /* max number of SQL script files allowed */ +#define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */ #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */ /* - * structures used in custom query mode + * Simple data structure to keep stats about something. + * + * XXX probably the first value should be kept and used as an offset for + * better numerical stability... + */ +typedef struct SimpleStats +{ + int64 count; /* how many values were encountered */ + double min; /* the minimum seen */ + double max; /* the maximum seen */ + double sum; /* sum of values */ + double sum2; /* sum of squared values */ +} SimpleStats; + +/* + * Data structure to hold various statistics: per-thread and per-script stats + * are maintained and merged together. */ +typedef struct StatsData +{ + long start_time; /* interval start time, for aggregates */ + int64 cnt; /* number of transactions */ + int64 skipped; /* number of transactions skipped under --rate + * and --latency-limit */ + SimpleStats latency; + SimpleStats lag; +} StatsData; +/* + * Connection state + */ typedef struct { PGconn *con; /* connection handle to DB */ int id; /* client No. */ int state; /* state No. */ - int cnt; /* xacts count */ - int ecnt; /* error count */ - int listen; /* 0 indicates that an async query has been - * sent */ - int sleeping; /* 1 indicates that the client is napping */ + bool listen; /* whether an async query has been sent */ + bool sleeping; /* whether the client is napping */ bool throttling; /* whether nap is for throttling */ + bool is_throttled; /* whether transaction throttling is done */ Variable *variables; /* array of variable definitions */ - int nvariables; + int nvariables; /* number of variables */ + bool vars_sorted; /* are variables sorted by name? */ int64 txn_scheduled; /* scheduled start time of transaction (usec) */ instr_time txn_begin; /* used for measuring schedule lag times */ instr_time stmt_begin; /* used for measuring statement latencies */ - int64 txn_latencies; /* cumulated latencies */ - int64 txn_sqlats; /* cumulated square latencies */ - bool is_throttled; /* whether transaction throttling is done */ - int use_file; /* index in sql_files for this client */ - bool prepared[MAX_FILES]; + int use_file; /* index in sql_scripts for this client */ + bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ + + /* per client collected stats */ + int64 cnt; /* transaction count */ + int ecnt; /* error count */ } CState; /* - * Thread state and result + * Thread state */ typedef struct { @@ -240,32 +272,19 @@ typedef struct pthread_t thread; /* thread handle */ CState *state; /* array of CState */ int nstate; /* length of state[] */ - instr_time start_time; /* thread start time */ - instr_time *exec_elapsed; /* time spent executing cmds (per Command) */ - int *exec_count; /* number of cmd executions (per Command) */ unsigned short random_state[3]; /* separate randomness for each thread */ int64 throttle_trigger; /* previous/next throttling (us) */ - int64 throttle_lag; /* total transaction lag behind throttling */ - int64 throttle_lag_max; /* max transaction lag */ - int64 throttle_latency_skipped; /* lagging transactions - * skipped */ - int64 latency_late; /* late transactions */ + FILE *logfile; /* where to log, or NULL */ + + /* per thread collected stats */ + instr_time start_time; /* thread start time */ + instr_time conn_time; + StatsData stats; + int64 latency_late; /* executed but late transactions */ } TState; #define INVALID_THREAD ((pthread_t) 0) -typedef struct -{ - instr_time conn_time; - int64 xacts; - int64 latencies; - int64 sqlats; - int64 throttle_lag; - int64 throttle_lag_max; - int64 throttle_latency_skipped; - int64 latency_late; -} TResult; - /* * queries read from files */ @@ -286,124 +305,137 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"}; typedef struct { - char *line; /* full text of command line */ + char *line; /* text of command line */ int command_num; /* unique index of this Command struct */ int type; /* command type (SQL_COMMAND or META_COMMAND) */ int argc; /* number of command words */ char *argv[MAX_ARGS]; /* command word list */ - int cols[MAX_ARGS]; /* corresponding column starting from 1 */ - PgBenchExpr *expr; /* parsed expression */ + PgBenchExpr *expr; /* parsed expression, if needed */ + SimpleStats stats; /* time spent in this command */ } Command; -typedef struct +typedef struct ParsedScript { + const char *desc; /* script descriptor (eg, file name) */ + int weight; /* selection weight */ + Command **commands; /* NULL-terminated array of Commands */ + StatsData stats; /* total time spent in script */ +} ParsedScript; + +static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */ +static int num_scripts; /* number of scripts in sql_script[] */ +static int num_commands = 0; /* total number of Command structs */ +static int64 total_weight = 0; - long start_time; /* when does the interval start */ - int cnt; /* number of transactions */ - int skipped; /* number of transactions skipped under --rate - * and --latency-limit */ - - double min_latency; /* min/max latencies */ - double max_latency; - double sum_latency; /* sum(latency), sum(latency^2) - for - * estimates */ - double sum2_latency; +static int debug = 0; /* debug flag */ - double min_lag; - double max_lag; - double sum_lag; /* sum(lag) */ - double sum2_lag; /* sum(lag*lag) */ -} AggVals; +/* Builtin test scripts */ +typedef struct BuiltinScript +{ + const char *name; /* very short name for -b ... */ + const char *desc; /* short description */ + const char *script; /* actual pgbench script */ +} BuiltinScript; -static Command **sql_files[MAX_FILES]; /* SQL script files */ -static int num_files; /* number of script files */ -static int num_commands = 0; /* total number of Command structs */ -static int debug = 0; /* debug flag */ -/* default scenario */ -static char *tpc_b = { - "\\set nbranches " CppAsString2(nbranches) " * :scale\n" - "\\set ntellers " CppAsString2(ntellers) " * :scale\n" - "\\set naccounts " CppAsString2(naccounts) " * :scale\n" - "\\setrandom aid 1 :naccounts\n" - "\\setrandom bid 1 :nbranches\n" - "\\setrandom tid 1 :ntellers\n" - "\\setrandom delta -5000 5000\n" - "BEGIN;\n" - "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" - "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" - "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n" - "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n" - "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" - "END;\n" -}; +static const BuiltinScript builtin_script[] = +{ + { + "tpcb-like", + "<builtin: TPC-B (sort of)>", + "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n" + "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n" + "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n" + "\\set delta random(-5000, 5000)\n" + "BEGIN;\n" + "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" + "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" + "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n" + "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n" + "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" + "END;\n" + }, #ifdef PGXC -static char *tpc_b_bid = { - "\\set nbranches " CppAsString2(nbranches) " * :scale\n" - "\\set ntellers " CppAsString2(ntellers) " * :scale\n" - "\\set naccounts " CppAsString2(naccounts) " * :scale\n" - "\\setrandom aid 1 :naccounts\n" - "\\setrandom bid 1 :nbranches\n" - "\\setrandom tid 1 :ntellers\n" - "\\setrandom delta -5000 5000\n" - "BEGIN;\n" - "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid AND bid = :bid;\n" - "SELECT abalance FROM pgbench_accounts WHERE aid = :aid AND bid = :bid\n" - "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid AND bid = :bid;\n" - "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n" - "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" - "END;\n" -}; + { + "tpcb-like-bid", + "<builtin: TPC-B (sort of)>", + "\\set nbranches " CppAsString2(nbranches) " * :scale\n" + "\\set ntellers " CppAsString2(ntellers) " * :scale\n" + "\\set naccounts " CppAsString2(naccounts) " * :scale\n" + "\\setrandom aid 1 :naccounts\n" + "\\setrandom bid 1 :nbranches\n" + "\\setrandom tid 1 :ntellers\n" + "\\setrandom delta -5000 5000\n" + "BEGIN;\n" + "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid AND bid = :bid;\n" + "SELECT abalance FROM pgbench_accounts WHERE aid = :aid AND bid = :bid\n" + "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid AND bid = :bid;\n" + "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n" + "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" + "END;\n" + }, #endif - - -/* -N case */ -static char *simple_update = { - "\\set nbranches " CppAsString2(nbranches) " * :scale\n" - "\\set ntellers " CppAsString2(ntellers) " * :scale\n" - "\\set naccounts " CppAsString2(naccounts) " * :scale\n" - "\\setrandom aid 1 :naccounts\n" - "\\setrandom bid 1 :nbranches\n" - "\\setrandom tid 1 :ntellers\n" - "\\setrandom delta -5000 5000\n" - "BEGIN;\n" - "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" - "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" - "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" - "END;\n" -}; - + { + "simple-update", + "<builtin: simple update>", + "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n" + "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n" + "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n" + "\\set delta random(-5000, 5000)\n" + "BEGIN;\n" + "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" + "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" + "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" + "END;\n" + }, #ifdef PGXC -static char *simple_update_bid = { - "\\set nbranches " CppAsString2(nbranches) " * :scale\n" - "\\set ntellers " CppAsString2(ntellers) " * :scale\n" - "\\set naccounts " CppAsString2(naccounts) " * :scale\n" - "\\setrandom aid 1 :naccounts\n" - "\\setrandom bid 1 :nbranches\n" - "\\setrandom tid 1 :ntellers\n" - "\\setrandom delta -5000 5000\n" - "BEGIN;\n" - "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid AND bid = :bid;\n" - "SELECT abalance FROM pgbench_accounts WHERE aid = :aid AND bid = :bid;\n" - "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" - "END;\n" -}; + { + "simple-update-bid", + "<builtin: simple update bid>", + "\\set nbranches " CppAsString2(nbranches) " * :scale\n" + "\\set ntellers " CppAsString2(ntellers) " * :scale\n" + "\\set naccounts " CppAsString2(naccounts) " * :scale\n" + "\\setrandom aid 1 :naccounts\n" + "\\setrandom bid 1 :nbranches\n" + "\\setrandom tid 1 :ntellers\n" + "\\setrandom delta -5000 5000\n" + "BEGIN;\n" + "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid AND bid = :bid;\n" + "SELECT abalance FROM pgbench_accounts WHERE aid = :aid AND bid = :bid;\n" + "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" + "END;\n" + }, #endif - -/* -S case */ -static char *select_only = { - "\\set naccounts " CppAsString2(naccounts) " * :scale\n" - "\\setrandom aid 1 :naccounts\n" - "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" + { + "select-only", + "<builtin: select only>", + "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n" + "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" + } }; + /* Function prototypes */ -static void setalarm(int seconds); +static void setIntValue(PgBenchValue *pv, int64 ival); +static void setDoubleValue(PgBenchValue *pv, double dval); +static bool evaluateExpr(TState *, CState *, PgBenchExpr *, PgBenchValue *); +static void doLog(TState *thread, CState *st, instr_time *now, + StatsData *agg, bool skipped, double latency, double lag); +static void processXactStats(TState *thread, CState *st, instr_time *now, + bool skipped, StatsData *agg); +static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2); +static void addScript(ParsedScript script); static void *threadRun(void *arg); +static void setalarm(int seconds); + + +/* callback functions for our flex lexer */ +static const PsqlScanCallbacks pgbench_callbacks = { + NULL, /* don't need get_variable functionality */ + pgbench_error +}; -static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, - AggVals *agg, bool skipped); static void usage(void) @@ -425,33 +457,38 @@ usage(void) " create indexes in the specified tablespace\n" " --tablespace=TABLESPACE create tables in the specified tablespace\n" " --unlogged-tables create tables as unlogged tables\n" + "\nOptions to select what to run:\n" + " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n" + " (use \"-b list\" to list available scripts)\n" + " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n" + " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n" + " (same as \"-b simple-update\")\n" + " -S, --select-only perform SELECT-only transactions\n" + " (same as \"-b select-only\")\n" "\nBenchmarking options:\n" " -c, --client=NUM number of concurrent database clients (default: 1)\n" " -C, --connect establish new connection for each transaction\n" " -D, --define=VARNAME=VALUE\n" " define variable for use by custom script\n" - " -f, --file=FILENAME read transaction script from FILENAME\n" #ifdef PGXC " -k query with default key and additional key branch id (bid)\n" #endif " -j, --jobs=NUM number of threads (default: 1)\n" " -l, --log write transaction times to log file\n" - " -L, --latency-limit=NUM count transactions lasting more than NUM ms\n" - " as late.\n" + " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n" " -M, --protocol=simple|extended|prepared\n" " protocol for submitting queries (default: simple)\n" " -n, --no-vacuum do not run VACUUM before tests\n" - " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n" " -P, --progress=NUM show thread progress report every NUM seconds\n" " -r, --report-latencies report average latency per command\n" " -R, --rate=NUM target rate in transactions per second\n" " -s, --scale=NUM report this scale factor in output\n" - " -S, --select-only perform SELECT-only transactions\n" " -t, --transactions=NUM number of transactions each client runs (default: 10)\n" " -T, --time=NUM duration of benchmark test in seconds\n" " -v, --vacuum-all vacuum all four standard tables before tests\n" " --aggregate-interval=NUM aggregate data over NUM seconds\n" - " --sampling-rate=NUM fraction of transactions to log (e.g. 0.01 for 1%%)\n" + " --progress-timestamp use Unix epoch timestamps for progress\n" + " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n" "\nCommon options:\n" " -d, --debug print debugging output\n" " -h, --host=HOSTNAME database server host or socket directory\n" @@ -464,6 +501,33 @@ usage(void) progname, progname); } +/* return whether str matches "^\s*[-+]?[0-9]+$" */ +static bool +is_an_int(const char *str) +{ + const char *ptr = str; + + /* skip leading spaces; cast is consistent with strtoint64 */ + while (*ptr && isspace((unsigned char) *ptr)) + ptr++; + + /* skip sign */ + if (*ptr == '+' || *ptr == '-') + ptr++; + + /* at least one digit */ + if (*ptr && !isdigit((unsigned char) *ptr)) + return false; + + /* eat all digits */ + while (*ptr && isdigit((unsigned char) *ptr)) + ptr++; + + /* must have reached end of string */ + return *ptr == '\0'; +} + + /* * strtoint64 -- convert a string to 64-bit integer * @@ -550,48 +614,52 @@ getrand(TState *thread, int64 min, int64 max) /* * random number generator: exponential distribution from min to max inclusive. - * the threshold is so that the density of probability for the last cut-off max - * value is exp(-threshold). + * the parameter is so that the density of probability for the last cut-off max + * value is exp(-parameter). */ static int64 -getExponentialRand(TState *thread, int64 min, int64 max, double threshold) +getExponentialRand(TState *thread, int64 min, int64 max, double parameter) { double cut, uniform, rand; - Assert(threshold > 0.0); - cut = exp(-threshold); + /* abort if wrong parameter, but must really be checked beforehand */ + Assert(parameter > 0.0); + cut = exp(-parameter); /* erand in [0, 1), uniform in (0, 1] */ uniform = 1.0 - pg_erand48(thread->random_state); /* - * inner expresion in (cut, 1] (if threshold > 0), rand in [0, 1) + * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1) */ Assert((1.0 - cut) != 0.0); - rand = -log(cut + (1.0 - cut) * uniform) / threshold; + rand = -log(cut + (1.0 - cut) * uniform) / parameter; /* return int64 random number within between min and max */ return min + (int64) ((max - min + 1) * rand); } /* random number generator: gaussian distribution from min to max inclusive */ static int64 -getGaussianRand(TState *thread, int64 min, int64 max, double threshold) +getGaussianRand(TState *thread, int64 min, int64 max, double parameter) { double stdev; double rand; + /* abort if parameter is too low, but must really be checked beforehand */ + Assert(parameter >= MIN_GAUSSIAN_PARAM); + /* - * Get user specified random number from this loop, with -threshold < - * stdev <= threshold + * Get user specified random number from this loop, with -parameter < + * stdev <= parameter * * This loop is executed until the number is in the expected range. * - * As the minimum threshold is 2.0, the probability of looping is low: + * As the minimum parameter is 2.0, the probability of looping is low: * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the * average sinus multiplier as 2/pi, we have a 8.6% looping probability in - * the worst case. For a 5.0 threshold value, the looping probability is - * about e^{-5} * 2 / pi ~ 0.43%. + * the worst case. For a parameter value of 5.0, the looping probability + * is about e^{-5} * 2 / pi ~ 0.43%. */ do { @@ -615,10 +683,10 @@ getGaussianRand(TState *thread, int64 min, int64 max, double threshold) * over. */ } - while (stdev < -threshold || stdev >= threshold); + while (stdev < -parameter || stdev >= parameter); - /* stdev is in [-threshold, threshold), normalization to [0,1) */ - rand = (stdev + threshold) / (threshold * 2.0); + /* stdev is in [-parameter, parameter), normalization to [0,1) */ + rand = (stdev + parameter) / (parameter * 2.0); /* return int64 random number within between min and max */ return min + (int64) ((max - min + 1) * rand); @@ -643,6 +711,82 @@ getPoissonRand(TState *thread, int64 center) return (int64) (-log(uniform) * ((double) center) + 0.5); } +/* + * Initialize the given SimpleStats struct to all zeroes + */ +static void +initSimpleStats(SimpleStats *ss) +{ + memset(ss, 0, sizeof(SimpleStats)); +} + +/* + * Accumulate one value into a SimpleStats struct. + */ +static void +addToSimpleStats(SimpleStats *ss, double val) +{ + if (ss->count == 0 || val < ss->min) + ss->min = val; + if (ss->count == 0 || val > ss->max) + ss->max = val; + ss->count++; + ss->sum += val; + ss->sum2 += val * val; +} + +/* + * Merge two SimpleStats objects + */ +static void +mergeSimpleStats(SimpleStats *acc, SimpleStats *ss) +{ + if (acc->count == 0 || ss->min < acc->min) + acc->min = ss->min; + if (acc->count == 0 || ss->max > acc->max) + acc->max = ss->max; + acc->count += ss->count; + acc->sum += ss->sum; + acc->sum2 += ss->sum2; +} + +/* + * Initialize a StatsData struct to mostly zeroes, with its start time set to + * the given value. + */ +static void +initStats(StatsData *sd, double start_time) +{ + sd->start_time = start_time; + sd->cnt = 0; + sd->skipped = 0; + initSimpleStats(&sd->latency); + initSimpleStats(&sd->lag); +} + +/* + * Accumulate one additional item into the given stats object. + */ +static void +accumStats(StatsData *stats, bool skipped, double lat, double lag) +{ + stats->cnt++; + + if (skipped) + { + /* no latency to record on skipped transactions */ + stats->skipped++; + } + else + { + addToSimpleStats(&stats->latency, lat); + + /* and possibly the same for schedule lag */ + if (throttle_delay) + addToSimpleStats(&stats->lag, lag); + } +} + /* call PQexec() and exit() on failure */ static void executeStatement(PGconn *con, const char *sql) @@ -713,7 +857,7 @@ doConnect(void) if (!conn) { - fprintf(stderr, "Connection to database \"%s\" failed\n", + fprintf(stderr, "connection to database \"%s\" failed\n", dbName); return NULL; } @@ -731,7 +875,7 @@ doConnect(void) /* check to see that the backend connection was successfully made */ if (PQstatus(conn) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database \"%s\" failed:\n%s", + fprintf(stderr, "connection to database \"%s\" failed:\n%s", dbName, PQerrorMessage(conn)); PQfinish(conn); return NULL; @@ -754,33 +898,98 @@ discard_response(CState *state) } while (res); } +/* qsort comparator for Variable array */ static int -compareVariables(const void *v1, const void *v2) +compareVariableNames(const void *v1, const void *v2) { return strcmp(((const Variable *) v1)->name, ((const Variable *) v2)->name); } -static char * -getVariable(CState *st, char *name) +/* Locate a variable by name; returns NULL if unknown */ +static Variable * +lookupVariable(CState *st, char *name) { - Variable key, - *var; + Variable key; /* On some versions of Solaris, bsearch of zero items dumps core */ if (st->nvariables <= 0) return NULL; + /* Sort if we have to */ + if (!st->vars_sorted) + { + qsort((void *) st->variables, st->nvariables, sizeof(Variable), + compareVariableNames); + st->vars_sorted = true; + } + + /* Now we can search */ key.name = name; - var = (Variable *) bsearch((void *) &key, - (void *) st->variables, - st->nvariables, - sizeof(Variable), - compareVariables); - if (var != NULL) - return var->value; + return (Variable *) bsearch((void *) &key, + (void *) st->variables, + st->nvariables, + sizeof(Variable), + compareVariableNames); +} + +/* Get the value of a variable, in string form; returns NULL if unknown */ +static char * +getVariable(CState *st, char *name) +{ + Variable *var; + char stringform[64]; + + var = lookupVariable(st, name); + if (var == NULL) + return NULL; /* not found */ + + if (var->value) + return var->value; /* we have it in string form */ + + /* We need to produce a string equivalent of the numeric value */ + Assert(var->is_numeric); + if (var->num_value.type == PGBT_INT) + snprintf(stringform, sizeof(stringform), + INT64_FORMAT, var->num_value.u.ival); else - return NULL; + { + Assert(var->num_value.type == PGBT_DOUBLE); + snprintf(stringform, sizeof(stringform), + "%.*g", DBL_DIG, var->num_value.u.dval); + } + var->value = pg_strdup(stringform); + return var->value; +} + +/* Try to convert variable to numeric form; return false on failure */ +static bool +makeVariableNumeric(Variable *var) +{ + if (var->is_numeric) + return true; /* no work */ + + if (is_an_int(var->value)) + { + setIntValue(&var->num_value, strtoint64(var->value)); + var->is_numeric = true; + } + else /* type should be double */ + { + double dv; + char xs; + + if (sscanf(var->value, "%lf%c", &dv, &xs) != 1) + { + fprintf(stderr, + "malformed variable \"%s\" value: \"%s\"\n", + var->name, var->value); + return false; + } + setDoubleValue(&var->num_value, dv); + var->is_numeric = true; + } + return true; } /* check whether the name consists of alphabets, numerals and underscores. */ @@ -795,26 +1004,20 @@ isLegalVariableName(const char *name) return false; } - return true; + return (i > 0); /* must be non-empty */ } -static int -putVariable(CState *st, const char *context, char *name, char *value) +/* + * Lookup a variable by name, creating it if need be. + * Caller is expected to assign a value to the variable. + * Returns NULL on failure (bad name). + */ +static Variable * +lookupCreateVariable(CState *st, const char *context, char *name) { - Variable key, - *var; - - key.name = name; - /* On some versions of Solaris, bsearch of zero items dumps core */ - if (st->nvariables > 0) - var = (Variable *) bsearch((void *) &key, - (void *) st->variables, - st->nvariables, - sizeof(Variable), - compareVariables); - else - var = NULL; + Variable *var; + var = lookupVariable(st, name); if (var == NULL) { Variable *newvars; @@ -825,10 +1028,12 @@ putVariable(CState *st, const char *context, char *name, char *value) */ if (!isLegalVariableName(name)) { - fprintf(stderr, "%s: invalid variable name '%s'\n", context, name); - return false; + fprintf(stderr, "%s: invalid variable name: \"%s\"\n", + context, name); + return NULL; } + /* Create variable at the end of the array */ if (st->variables) newvars = (Variable *) pg_realloc(st->variables, (st->nvariables + 1) * sizeof(Variable)); @@ -840,27 +1045,72 @@ putVariable(CState *st, const char *context, char *name, char *value) var = &newvars[st->nvariables]; var->name = pg_strdup(name); - var->value = pg_strdup(value); + var->value = NULL; + /* caller is expected to initialize remaining fields */ st->nvariables++; - - qsort((void *) st->variables, st->nvariables, sizeof(Variable), - compareVariables); + /* we don't re-sort the array till we have to */ + st->vars_sorted = false; } - else - { - char *val; - /* dup then free, in case value is pointing at this variable */ - val = pg_strdup(value); + return var; +} + +/* Assign a string value to a variable, creating it if need be */ +/* Returns false on failure (bad name) */ +static bool +putVariable(CState *st, const char *context, char *name, const char *value) +{ + Variable *var; + char *val; + var = lookupCreateVariable(st, context, name); + if (!var) + return false; + + /* dup then free, in case value is pointing at this variable */ + val = pg_strdup(value); + + if (var->value) free(var->value); - var->value = val; - } + var->value = val; + var->is_numeric = false; return true; } +/* Assign a numeric value to a variable, creating it if need be */ +/* Returns false on failure (bad name) */ +static bool +putVariableNumber(CState *st, const char *context, char *name, + const PgBenchValue *value) +{ + Variable *var; + + var = lookupCreateVariable(st, context, name); + if (!var) + return false; + + if (var->value) + free(var->value); + var->value = NULL; + var->is_numeric = true; + var->num_value = *value; + + return true; +} + +/* Assign an integer value to a variable, creating it if need be */ +/* Returns false on failure (bad name) */ +static bool +putVariableInt(CState *st, const char *context, char *name, int64 value) +{ + PgBenchValue val; + + setIntValue(&val, value); + return putVariableNumber(st, context, name, &val); +} + static char * parseVariable(const char *sql, int *eaten) { @@ -947,89 +1197,471 @@ getQueryParams(CState *st, const Command *command, const char **params) params[i] = getVariable(st, command->argv[i + 1]); } +/* get a value as an int, tell if there is a problem */ +static bool +coerceToInt(PgBenchValue *pval, int64 *ival) +{ + if (pval->type == PGBT_INT) + { + *ival = pval->u.ival; + return true; + } + else + { + double dval = pval->u.dval; + + Assert(pval->type == PGBT_DOUBLE); + if (dval < PG_INT64_MIN || PG_INT64_MAX < dval) + { + fprintf(stderr, "double to int overflow for %f\n", dval); + return false; + } + *ival = (int64) dval; + return true; + } +} + +/* get a value as a double, or tell if there is a problem */ +static bool +coerceToDouble(PgBenchValue *pval, double *dval) +{ + if (pval->type == PGBT_DOUBLE) + { + *dval = pval->u.dval; + return true; + } + else + { + Assert(pval->type == PGBT_INT); + *dval = (double) pval->u.ival; + return true; + } +} + +/* assign an integer value */ +static void +setIntValue(PgBenchValue *pv, int64 ival) +{ + pv->type = PGBT_INT; + pv->u.ival = ival; +} + +/* assign a double value */ +static void +setDoubleValue(PgBenchValue *pv, double dval) +{ + pv->type = PGBT_DOUBLE; + pv->u.dval = dval; +} + +/* maximum number of function arguments */ +#define MAX_FARGS 16 + /* - * Recursive evaluation of an expression in a pgbench script - * using the current state of variables. - * Returns whether the evaluation was ok, - * the value itself is returned through the retval pointer. + * Recursive evaluation of functions */ static bool -evaluateExpr(CState *st, PgBenchExpr *expr, int64 *retval) +evalFunc(TState *thread, CState *st, + PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval) { - switch (expr->etype) + /* evaluate all function arguments */ + int nargs = 0; + PgBenchValue vargs[MAX_FARGS]; + PgBenchExprLink *l = args; + + for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next) + if (!evaluateExpr(thread, st, l->expr, &vargs[nargs])) + return false; + + if (l != NULL) { - case ENODE_INTEGER_CONSTANT: + fprintf(stderr, + "too many function arguments, maximum is %d\n", MAX_FARGS); + return false; + } + + /* then evaluate function */ + switch (func) + { + /* overloaded operators */ + case PGBENCH_ADD: + case PGBENCH_SUB: + case PGBENCH_MUL: + case PGBENCH_DIV: + case PGBENCH_MOD: + { + PgBenchValue *lval = &vargs[0], + *rval = &vargs[1]; + + Assert(nargs == 2); + + /* overloaded type management, double if some double */ + if ((lval->type == PGBT_DOUBLE || + rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD) + { + double ld, + rd; + + if (!coerceToDouble(lval, &ld) || + !coerceToDouble(rval, &rd)) + return false; + + switch (func) + { + case PGBENCH_ADD: + setDoubleValue(retval, ld + rd); + return true; + + case PGBENCH_SUB: + setDoubleValue(retval, ld - rd); + return true; + + case PGBENCH_MUL: + setDoubleValue(retval, ld * rd); + return true; + + case PGBENCH_DIV: + setDoubleValue(retval, ld / rd); + return true; + + default: + /* cannot get here */ + Assert(0); + } + } + else /* we have integer operands, or % */ + { + int64 li, + ri; + + if (!coerceToInt(lval, &li) || + !coerceToInt(rval, &ri)) + return false; + + switch (func) + { + case PGBENCH_ADD: + setIntValue(retval, li + ri); + return true; + + case PGBENCH_SUB: + setIntValue(retval, li - ri); + return true; + + case PGBENCH_MUL: + setIntValue(retval, li * ri); + return true; + + case PGBENCH_DIV: + case PGBENCH_MOD: + if (ri == 0) + { + fprintf(stderr, "division by zero\n"); + return false; + } + /* special handling of -1 divisor */ + if (ri == -1) + { + if (func == PGBENCH_DIV) + { + /* overflow check (needed for INT64_MIN) */ + if (li == PG_INT64_MIN) + { + fprintf(stderr, "bigint out of range\n"); + return false; + } + else + setIntValue(retval, -li); + } + else + setIntValue(retval, 0); + return true; + } + /* else divisor is not -1 */ + if (func == PGBENCH_DIV) + setIntValue(retval, li / ri); + else /* func == PGBENCH_MOD */ + setIntValue(retval, li % ri); + + return true; + + default: + /* cannot get here */ + Assert(0); + } + } + } + + /* no arguments */ + case PGBENCH_PI: + setDoubleValue(retval, M_PI); + return true; + + /* 1 overloaded argument */ + case PGBENCH_ABS: { - *retval = expr->u.integer_constant.ival; + PgBenchValue *varg = &vargs[0]; + + Assert(nargs == 1); + + if (varg->type == PGBT_INT) + { + int64 i = varg->u.ival; + + setIntValue(retval, i < 0 ? -i : i); + } + else + { + double d = varg->u.dval; + + Assert(varg->type == PGBT_DOUBLE); + setDoubleValue(retval, d < 0.0 ? -d : d); + } + return true; } - case ENODE_VARIABLE: + case PGBENCH_DEBUG: { - char *var; + PgBenchValue *varg = &vargs[0]; - if ((var = getVariable(st, expr->u.variable.varname)) == NULL) + Assert(nargs == 1); + + fprintf(stderr, "debug(script=%d,command=%d): ", + st->use_file, st->state + 1); + + if (varg->type == PGBT_INT) + fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival); + else { - fprintf(stderr, "undefined variable %s\n", - expr->u.variable.varname); + Assert(varg->type == PGBT_DOUBLE); + fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval); + } + + *retval = *varg; + + return true; + } + + /* 1 double argument */ + case PGBENCH_DOUBLE: + case PGBENCH_SQRT: + { + double dval; + + Assert(nargs == 1); + + if (!coerceToDouble(&vargs[0], &dval)) + return false; + + if (func == PGBENCH_SQRT) + dval = sqrt(dval); + + setDoubleValue(retval, dval); + return true; + } + + /* 1 int argument */ + case PGBENCH_INT: + { + int64 ival; + + Assert(nargs == 1); + + if (!coerceToInt(&vargs[0], &ival)) return false; + + setIntValue(retval, ival); + return true; + } + + /* variable number of arguments */ + case PGBENCH_LEAST: + case PGBENCH_GREATEST: + { + bool havedouble; + int i; + + Assert(nargs >= 1); + + /* need double result if any input is double */ + havedouble = false; + for (i = 0; i < nargs; i++) + { + if (vargs[i].type == PGBT_DOUBLE) + { + havedouble = true; + break; + } + } + if (havedouble) + { + double extremum; + + if (!coerceToDouble(&vargs[0], &extremum)) + return false; + for (i = 1; i < nargs; i++) + { + double dval; + + if (!coerceToDouble(&vargs[i], &dval)) + return false; + if (func == PGBENCH_LEAST) + extremum = Min(extremum, dval); + else + extremum = Max(extremum, dval); + } + setDoubleValue(retval, extremum); + } + else + { + int64 extremum; + + if (!coerceToInt(&vargs[0], &extremum)) + return false; + for (i = 1; i < nargs; i++) + { + int64 ival; + + if (!coerceToInt(&vargs[i], &ival)) + return false; + if (func == PGBENCH_LEAST) + extremum = Min(extremum, ival); + else + extremum = Max(extremum, ival); + } + setIntValue(retval, extremum); } - *retval = strtoint64(var); return true; } - case ENODE_OPERATOR: + /* random functions */ + case PGBENCH_RANDOM: + case PGBENCH_RANDOM_EXPONENTIAL: + case PGBENCH_RANDOM_GAUSSIAN: { - int64 lval; - int64 rval; + int64 imin, + imax; + + Assert(nargs >= 2); + + if (!coerceToInt(&vargs[0], &imin) || + !coerceToInt(&vargs[1], &imax)) + return false; - if (!evaluateExpr(st, expr->u.operator.lexpr, &lval)) + /* check random range */ + if (imin > imax) + { + fprintf(stderr, "empty range given to random\n"); return false; - if (!evaluateExpr(st, expr->u.operator.rexpr, &rval)) + } + else if (imax - imin < 0 || (imax - imin) + 1 < 0) + { + /* prevent int overflows in random functions */ + fprintf(stderr, "random range is too large\n"); return false; - switch (expr->u.operator.operator) + } + + if (func == PGBENCH_RANDOM) + { + Assert(nargs == 2); + setIntValue(retval, getrand(thread, imin, imax)); + } + else /* gaussian & exponential */ { - case '+': - *retval = lval + rval; - return true; + double param; - case '-': - *retval = lval - rval; - return true; + Assert(nargs == 3); - case '*': - *retval = lval * rval; - return true; + if (!coerceToDouble(&vargs[2], ¶m)) + return false; - case '/': - if (rval == 0) + if (func == PGBENCH_RANDOM_GAUSSIAN) + { + if (param < MIN_GAUSSIAN_PARAM) { - fprintf(stderr, "division by zero\n"); + fprintf(stderr, + "gaussian parameter must be at least %f " + "(not %f)\n", MIN_GAUSSIAN_PARAM, param); return false; } - *retval = lval / rval; - return true; - case '%': - if (rval == 0) + setIntValue(retval, + getGaussianRand(thread, imin, imax, param)); + } + else /* exponential */ + { + if (param <= 0.0) { - fprintf(stderr, "division by zero\n"); + fprintf(stderr, + "exponential parameter must be greater than zero" + " (got %f)\n", param); return false; } - *retval = lval % rval; - return true; + + setIntValue(retval, + getExponentialRand(thread, imin, imax, param)); + } } - fprintf(stderr, "bad operator\n"); - return false; + return true; } default: - break; + /* cannot get here */ + Assert(0); + /* dead code to avoid a compiler warning */ + return false; } +} - fprintf(stderr, "bad expression\n"); - return false; +/* + * Recursive evaluation of an expression in a pgbench script + * using the current state of variables. + * Returns whether the evaluation was ok, + * the value itself is returned through the retval pointer. + */ +static bool +evaluateExpr(TState *thread, CState *st, PgBenchExpr *expr, PgBenchValue *retval) +{ + switch (expr->etype) + { + case ENODE_CONSTANT: + { + *retval = expr->u.constant; + return true; + } + + case ENODE_VARIABLE: + { + Variable *var; + + if ((var = lookupVariable(st, expr->u.variable.varname)) == NULL) + { + fprintf(stderr, "undefined variable \"%s\"\n", + expr->u.variable.varname); + return false; + } + + if (!makeVariableNumeric(var)) + return false; + + *retval = var->num_value; + return true; + } + + case ENODE_FUNCTION: + return evalFunc(thread, st, + expr->u.function.function, + expr->u.function.args, + retval); + + default: + /* internal error which should never occur */ + fprintf(stderr, "unexpected enode type in evaluation: %d\n", + expr->etype); + exit(1); + } } /* @@ -1070,14 +1702,15 @@ runShellCommand(CState *st, char *variable, char **argv, int argc) } else if ((arg = getVariable(st, argv[i] + 1)) == NULL) { - fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]); + fprintf(stderr, "%s: undefined variable \"%s\"\n", + argv[0], argv[i]); return false; } arglen = strlen(arg); if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1) { - fprintf(stderr, "%s: too long shell command\n", argv[0]); + fprintf(stderr, "%s: shell command is too long\n", argv[0]); return false; } @@ -1095,7 +1728,7 @@ runShellCommand(CState *st, char *variable, char **argv, int argc) if (system(command)) { if (!timer_exceeded) - fprintf(stderr, "%s: cannot launch shell command\n", argv[0]); + fprintf(stderr, "%s: could not launch shell command\n", argv[0]); return false; } return true; @@ -1104,19 +1737,19 @@ runShellCommand(CState *st, char *variable, char **argv, int argc) /* Execute the command with pipe and read the standard output. */ if ((fp = popen(command, "r")) == NULL) { - fprintf(stderr, "%s: cannot launch shell command\n", argv[0]); + fprintf(stderr, "%s: could not launch shell command\n", argv[0]); return false; } if (fgets(res, sizeof(res), fp) == NULL) { if (!timer_exceeded) - fprintf(stderr, "%s: cannot read the result\n", argv[0]); + fprintf(stderr, "%s: could not read result of shell command\n", argv[0]); (void) pclose(fp); return false; } if (pclose(fp) < 0) { - fprintf(stderr, "%s: cannot close shell command\n", argv[0]); + fprintf(stderr, "%s: could not close shell command\n", argv[0]); return false; } @@ -1126,15 +1759,15 @@ runShellCommand(CState *st, char *variable, char **argv, int argc) endptr++; if (*res == '\0' || *endptr != '\0') { - fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res); + fprintf(stderr, "%s: shell command must return an integer (not \"%s\")\n", + argv[0], res); return false; } - snprintf(res, sizeof(res), "%d", retval); - if (!putVariable(st, "setshell", variable, res)) + if (!putVariableInt(st, "setshell", variable, retval)) return false; #ifdef DEBUG - printf("shell parameter name: %s, value: %s\n", argv[1], res); + printf("shell parameter name: \"%s\", value: \"%s\"\n", argv[1], res); #endif return true; } @@ -1147,10 +1780,8 @@ preparedStatementName(char *buffer, int file, int state) } static bool -clientDone(CState *st, bool ok) +clientDone(CState *st) { - (void) ok; /* unused */ - if (st->con != NULL) { PQfinish(st->con); @@ -1159,33 +1790,28 @@ clientDone(CState *st, bool ok) return false; /* always false */ } -static void -agg_vals_init(AggVals *aggs, instr_time start) +/* return a script number with a weighted choice. */ +static int +chooseScript(TState *thread) { - /* basic counters */ - aggs->cnt = 0; /* number of transactions (includes skipped) */ - aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */ - - aggs->sum_latency = 0; /* SUM(latency) */ - aggs->sum2_latency = 0; /* SUM(latency*latency) */ + int i = 0; + int64 w; - /* min and max transaction duration */ - aggs->min_latency = 0; - aggs->max_latency = 0; + if (num_scripts == 1) + return 0; - /* schedule lag counters */ - aggs->sum_lag = 0; - aggs->sum2_lag = 0; - aggs->min_lag = 0; - aggs->max_lag = 0; + w = getrand(thread, 0, total_weight - 1); + do + { + w -= sql_script[i++].weight; + } while (w >= 0); - /* start of the current interval */ - aggs->start_time = INSTR_TIME_GET_DOUBLE(start); + return i - 1; } /* return false iff client should be disconnected */ static bool -doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg) +doCustom(TState *thread, CState *st, StatsData *agg) { PGresult *res; Command **commands; @@ -1197,12 +1823,13 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa * first time it's needed, and reuse the same value throughout this * function after that. This also ensures that e.g. the calculated latency * reported in the log file and in the totals are the same. Zero means - * "not set yet". + * "not set yet". Reset "now" when we step to the next command with "goto + * top", though. */ +top: INSTR_TIME_SET_ZERO(now); -top: - commands = sql_files[st->use_file]; + commands = sql_script[st->use_file].commands; /* * Handle throttling once per transaction by sleeping. It is simpler to @@ -1223,6 +1850,10 @@ top: thread->throttle_trigger += wait; st->txn_scheduled = thread->throttle_trigger; + /* stop client if next transaction is beyond pgbench end of execution */ + if (duration > 0 && st->txn_scheduled > end_time) + return clientDone(st); + /* * If this --latency-limit is used, and this slot is already late so * that the transaction will miss the latency limit even if it @@ -1238,18 +1869,15 @@ top: now_us = INSTR_TIME_GET_MICROSEC(now); while (thread->throttle_trigger < now_us - latency_limit) { - thread->throttle_latency_skipped++; - - if (logfile) - doLog(thread, st, logfile, &now, agg, true); - + processXactStats(thread, st, &now, true, agg); + /* next rendez-vous */ wait = getPoissonRand(thread, throttle_delay); thread->throttle_trigger += wait; st->txn_scheduled = thread->throttle_trigger; } } - st->sleeping = 1; + st->sleeping = true; st->throttling = true; st->is_throttled = true; if (debug) @@ -1259,27 +1887,13 @@ top: if (st->sleeping) { /* are we sleeping? */ - int64 now_us; - if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT(now); - now_us = INSTR_TIME_GET_MICROSEC(now); - if (st->txn_scheduled <= now_us) - { - st->sleeping = 0; /* Done sleeping, go ahead with next command */ - if (st->throttling) - { - /* Measure lag of throttled transaction relative to target */ - int64 lag = now_us - st->txn_scheduled; - - thread->throttle_lag += lag; - if (lag > thread->throttle_lag_max) - thread->throttle_lag_max = lag; - st->throttling = false; - } - } - else + if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled) return true; /* Still sleeping, nothing to do here */ + /* Else done sleeping, go ahead with next command */ + st->sleeping = false; + st->throttling = false; } if (st->listen) @@ -1290,8 +1904,8 @@ top: fprintf(stderr, "client %d receiving\n", st->id); if (!PQconsumeInput(st->con)) { /* there's something wrong */ - fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state); - return clientDone(st, false); + fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state); + return clientDone(st); } if (PQisBusy(st->con)) return true; /* don't have the whole result yet */ @@ -1303,47 +1917,23 @@ top: */ if (is_latencies) { - int cnum = commands[st->state]->command_num; - if (INSTR_TIME_IS_ZERO(now)) INSTR_TIME_SET_CURRENT(now); - INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum], - now, st->stmt_begin); - thread->exec_count[cnum]++; + + /* XXX could use a mutex here, but we choose not to */ + addToSimpleStats(&commands[st->state]->stats, + INSTR_TIME_GET_DOUBLE(now) - + INSTR_TIME_GET_DOUBLE(st->stmt_begin)); } /* transaction finished: calculate latency and log the transaction */ if (commands[st->state + 1] == NULL) { - /* only calculate latency if an option is used that needs it */ - if (progress || throttle_delay || latency_limit) - { - int64 latency; - - if (INSTR_TIME_IS_ZERO(now)) - INSTR_TIME_SET_CURRENT(now); - - latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled; - - st->txn_latencies += latency; - - /* - * XXX In a long benchmark run of high-latency transactions, - * this int64 addition eventually overflows. For example, 100 - * threads running 10s transactions will overflow it in 2.56 - * hours. With a more-typical OLTP workload of .1s - * transactions, overflow would take 256 hours. - */ - st->txn_sqlats += latency * latency; - - /* record over the limit transactions if needed. */ - if (latency_limit && latency > latency_limit) - thread->latency_late++; - } - - /* record the time it took in the log */ - if (logfile) - doLog(thread, st, logfile, &now, agg, false); + if (progress || throttle_delay || latency_limit || + per_script_stats || use_log) + processXactStats(thread, st, &now, false, agg); + else + thread->stats.cnt++; } if (commands[st->state]->type == SQL_COMMAND) @@ -1359,10 +1949,10 @@ top: case PGRES_TUPLES_OK: break; /* OK */ default: - fprintf(stderr, "Client %d aborted in state %d: %s", + fprintf(stderr, "client %d aborted in state %d: %s", st->id, st->state, PQerrorMessage(st->con)); PQclear(res); - return clientDone(st, false); + return clientDone(st); } PQclear(res); discard_response(st); @@ -1378,7 +1968,7 @@ top: ++st->cnt; if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) - return clientDone(st, true); /* exit success */ + return clientDone(st); /* exit success */ } /* increment state counter */ @@ -1386,8 +1976,11 @@ top: if (commands[st->state] == NULL) { st->state = 0; - st->use_file = (int) getrand(thread, 0, num_files - 1); - commands = sql_files[st->use_file]; + st->use_file = chooseScript(thread); + commands = sql_script[st->use_file].commands; + if (debug) + fprintf(stderr, "client %d executing script \"%s\"\n", st->id, + sql_script[st->use_file].desc); st->is_throttled = false; /* @@ -1395,9 +1988,9 @@ top: * nothing to listen to right now. When throttling rate limits * are active, a sleep will happen next, as the next transaction * starts. And then in any case the next SQL command will set - * listen back to 1. + * listen back to true. */ - st->listen = 0; + st->listen = false; trans_needs_throttle = (throttle_delay > 0); } } @@ -1410,11 +2003,19 @@ top: INSTR_TIME_SET_CURRENT(start); if ((st->con = doConnect()) == NULL) { - fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id); - return clientDone(st, false); + fprintf(stderr, "client %d aborted while establishing connection\n", + st->id); + return clientDone(st); } INSTR_TIME_SET_CURRENT(end); - INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); + INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start); + + /* Reset session-local state */ + st->listen = false; + st->sleeping = false; + st->throttling = false; + st->is_throttled = false; + memset(st->prepared, 0, sizeof(st->prepared)); } /* @@ -1429,7 +2030,8 @@ top: } /* Record transaction start time under logging, progress or throttling */ - if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0) + if ((use_log || progress || throttle_delay || latency_limit || + per_script_stats) && st->state == 0) { INSTR_TIME_SET_CURRENT(st->txn_begin); @@ -1514,11 +2116,12 @@ top: if (r == 0) { if (debug) - fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]); + fprintf(stderr, "client %d could not send %s\n", + st->id, command->argv[0]); st->ecnt++; } else - st->listen = 1; /* flags that should be listened */ + st->listen = true; /* flags that should be listened */ } else if (commands[st->state]->type == META_COMMAND) { @@ -1534,158 +2137,24 @@ top: fprintf(stderr, "\n"); } - if (pg_strcasecmp(argv[0], "setrandom") == 0) - { - char *var; - int64 min, - max; - double threshold = 0; - char res[64]; - - if (*argv[2] == ':') - { - if ((var = getVariable(st, argv[2] + 1)) == NULL) - { - fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); - st->ecnt++; - return true; - } - min = strtoint64(var); - } - else - min = strtoint64(argv[2]); - -#ifdef NOT_USED - if (min < 0) - { - fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min); - st->ecnt++; - return; - } -#endif - - if (*argv[3] == ':') - { - if ((var = getVariable(st, argv[3] + 1)) == NULL) - { - fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]); - st->ecnt++; - return true; - } - max = strtoint64(var); - } - else - max = strtoint64(argv[3]); - - if (max < min) - { - fprintf(stderr, "%s: maximum is less than minimum\n", argv[0]); - st->ecnt++; - return true; - } - - /* - * Generate random number functions need to be able to subtract - * max from min and add one to the result without overflowing. - * Since we know max > min, we can detect overflow just by - * checking for a negative result. But we must check both that the - * subtraction doesn't overflow, and that adding one to the result - * doesn't overflow either. - */ - if (max - min < 0 || (max - min) + 1 < 0) - { - fprintf(stderr, "%s: range too large\n", argv[0]); - st->ecnt++; - return true; - } - - if (argc == 4 || /* uniform without or with "uniform" keyword */ - (argc == 5 && pg_strcasecmp(argv[4], "uniform") == 0)) - { -#ifdef DEBUG - printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getrand(thread, min, max)); -#endif - snprintf(res, sizeof(res), INT64_FORMAT, getrand(thread, min, max)); - } - else if (argc == 6 && - ((pg_strcasecmp(argv[4], "gaussian") == 0) || - (pg_strcasecmp(argv[4], "exponential") == 0))) - { - if (*argv[5] == ':') - { - if ((var = getVariable(st, argv[5] + 1)) == NULL) - { - fprintf(stderr, "%s: invalid threshold number %s\n", argv[0], argv[5]); - st->ecnt++; - return true; - } - threshold = strtod(var, NULL); - } - else - threshold = strtod(argv[5], NULL); - - if (pg_strcasecmp(argv[4], "gaussian") == 0) - { - if (threshold < MIN_GAUSSIAN_THRESHOLD) - { - fprintf(stderr, "%s: gaussian threshold must be at least %f\n,", argv[5], MIN_GAUSSIAN_THRESHOLD); - st->ecnt++; - return true; - } -#ifdef DEBUG - printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getGaussianRand(thread, min, max, threshold)); -#endif - snprintf(res, sizeof(res), INT64_FORMAT, getGaussianRand(thread, min, max, threshold)); - } - else if (pg_strcasecmp(argv[4], "exponential") == 0) - { - if (threshold <= 0.0) - { - fprintf(stderr, "%s: exponential threshold must be strictly positive\n,", argv[5]); - st->ecnt++; - return true; - } -#ifdef DEBUG - printf("min: " INT64_FORMAT " max: " INT64_FORMAT " random: " INT64_FORMAT "\n", min, max, getExponentialRand(thread, min, max, threshold)); -#endif - snprintf(res, sizeof(res), INT64_FORMAT, getExponentialRand(thread, min, max, threshold)); - } - } - else /* this means an error somewhere in the parsing phase... */ - { - fprintf(stderr, "%s: unexpected arguments\n", argv[0]); - st->ecnt++; - return true; - } - - if (!putVariable(st, argv[0], argv[1], res)) - { - st->ecnt++; - return true; - } - - st->listen = 1; - } - else if (pg_strcasecmp(argv[0], "set") == 0) + if (pg_strcasecmp(argv[0], "set") == 0) { - char res[64]; PgBenchExpr *expr = commands[st->state]->expr; - int64 result; + PgBenchValue result; - if (!evaluateExpr(st, expr, &result)) + if (!evaluateExpr(thread, st, expr, &result)) { st->ecnt++; return true; } - sprintf(res, INT64_FORMAT, result); - if (!putVariable(st, argv[0], argv[1], res)) + if (!putVariableNumber(st, argv[0], argv[1], &result)) { st->ecnt++; return true; } - st->listen = 1; + st->listen = true; } else if (pg_strcasecmp(argv[0], "sleep") == 0) { @@ -1697,7 +2166,8 @@ top: { if ((var = getVariable(st, argv[1] + 1)) == NULL) { - fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]); + fprintf(stderr, "%s: undefined variable \"%s\"\n", + argv[0], argv[1]); st->ecnt++; return true; } @@ -1718,38 +2188,40 @@ top: INSTR_TIME_SET_CURRENT(now); st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now) + usec; - st->sleeping = 1; + st->sleeping = true; - st->listen = 1; + st->listen = true; } else if (pg_strcasecmp(argv[0], "setshell") == 0) { bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2); if (timer_exceeded) /* timeout */ - return clientDone(st, true); + return clientDone(st); else if (!ret) /* on error */ { st->ecnt++; return true; } else /* succeeded */ - st->listen = 1; + st->listen = true; } else if (pg_strcasecmp(argv[0], "shell") == 0) { bool ret = runShellCommand(st, NULL, argv + 1, argc - 1); if (timer_exceeded) /* timeout */ - return clientDone(st, true); + return clientDone(st); else if (!ret) /* on error */ { st->ecnt++; return true; } else /* succeeded */ - st->listen = 1; + st->listen = true; } + + /* after a meta command, immediately proceed with next command */ goto top; } @@ -1760,11 +2232,12 @@ top: * print log entry after completing one transaction. */ static void -doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, - bool skipped) +doLog(TState *thread, CState *st, instr_time *now, + StatsData *agg, bool skipped, double latency, double lag) { - double lag; - double latency; + FILE *logfile = thread->logfile; + + Assert(use_log); /* * Skip the log entry if sampling is enabled and this row doesn't belong @@ -1774,118 +2247,42 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, pg_erand48(thread->random_state) > sample_rate) return; - if (INSTR_TIME_IS_ZERO(*now)) - INSTR_TIME_SET_CURRENT(*now); - - latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled); - if (skipped) - lag = latency; - else - lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled); - /* should we aggregate the results or not? */ if (agg_interval > 0) { /* - * Are we still in the same interval? If yes, accumulate the values - * (print them otherwise) + * Loop until we reach the interval of the current transaction, and + * print all the empty intervals in between (this may happen with very + * low tps, e.g. --rate=0.1). */ - if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now)) + while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now)) { - agg->cnt += 1; - if (skipped) + /* print aggregated report to logfile */ + fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f", + agg->start_time, + agg->cnt, + agg->latency.sum, + agg->latency.sum2, + agg->latency.min, + agg->latency.max); + if (throttle_delay) { - /* - * there is no latency to record if the transaction was - * skipped - */ - agg->skipped += 1; + fprintf(logfile, " %.0f %.0f %.0f %.0f", + agg->lag.sum, + agg->lag.sum2, + agg->lag.min, + agg->lag.max); + if (latency_limit) + fprintf(logfile, " " INT64_FORMAT, agg->skipped); } - else - { - agg->sum_latency += latency; - agg->sum2_latency += latency * latency; - - /* first in this aggregation interval */ - if ((agg->cnt == 1) || (latency < agg->min_latency)) - agg->min_latency = latency; + fputc('\n', logfile); - if ((agg->cnt == 1) || (latency > agg->max_latency)) - agg->max_latency = latency; - - /* and the same for schedule lag */ - if (throttle_delay) - { - agg->sum_lag += lag; - agg->sum2_lag += lag * lag; - - if ((agg->cnt == 1) || (lag < agg->min_lag)) - agg->min_lag = lag; - if ((agg->cnt == 1) || (lag > agg->max_lag)) - agg->max_lag = lag; - } - } + /* reset data and move to next interval */ + initStats(agg, agg->start_time + agg_interval); } - else - { - /* - * Loop until we reach the interval of the current transaction - * (and print all the empty intervals in between). - */ - while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now)) - { - /* - * This is a non-Windows branch (thanks to the ifdef in - * usage), so we don't need to handle this in a special way - * (see below). - */ - fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f", - agg->start_time, - agg->cnt, - agg->sum_latency, - agg->sum2_latency, - agg->min_latency, - agg->max_latency); - if (throttle_delay) - { - fprintf(logfile, " %.0f %.0f %.0f %.0f", - agg->sum_lag, - agg->sum2_lag, - agg->min_lag, - agg->max_lag); - if (latency_limit) - fprintf(logfile, " %d", agg->skipped); - } - fputc('\n', logfile); - - /* move to the next inteval */ - agg->start_time = agg->start_time + agg_interval; - - /* reset for "no transaction" intervals */ - agg->cnt = 0; - agg->skipped = 0; - agg->min_latency = 0; - agg->max_latency = 0; - agg->sum_latency = 0; - agg->sum2_latency = 0; - agg->min_lag = 0; - agg->max_lag = 0; - agg->sum_lag = 0; - agg->sum2_lag = 0; - } - /* reset the values to include only the current transaction. */ - agg->cnt = 1; - agg->skipped = skipped ? 1 : 0; - agg->min_latency = latency; - agg->max_latency = latency; - agg->sum_latency = skipped ? 0.0 : latency; - agg->sum2_latency = skipped ? 0.0 : latency * latency; - agg->min_lag = lag; - agg->max_lag = lag; - agg->sum_lag = lag; - agg->sum2_lag = lag * lag; - } + /* accumulate the current transaction */ + accumStats(agg, skipped, latency, lag); } else { @@ -1894,21 +2291,21 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, /* This is more than we really ought to know about instr_time */ if (skipped) - fprintf(logfile, "%d %d skipped %d %ld %ld", + fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld", st->id, st->cnt, st->use_file, (long) now->tv_sec, (long) now->tv_usec); else - fprintf(logfile, "%d %d %.0f %d %ld %ld", + fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld", st->id, st->cnt, latency, st->use_file, (long) now->tv_sec, (long) now->tv_usec); #else /* On Windows, instr_time doesn't provide a timestamp anyway */ if (skipped) - fprintf(logfile, "%d %d skipped %d 0 0", + fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0", st->id, st->cnt, st->use_file); else - fprintf(logfile, "%d %d %.0f %d 0 0", + fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0", st->id, st->cnt, latency, st->use_file); #endif if (throttle_delay) @@ -1917,6 +2314,48 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg, } } +/* + * Accumulate and report statistics at end of a transaction. + * + * (This is also called when a transaction is late and thus skipped.) + */ +static void +processXactStats(TState *thread, CState *st, instr_time *now, + bool skipped, StatsData *agg) +{ + double latency = 0.0, + lag = 0.0; + + if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now)) + INSTR_TIME_SET_CURRENT(*now); + + if (!skipped) + { + /* compute latency & lag */ + latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled; + lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled; + } + + if (progress || throttle_delay || latency_limit) + { + accumStats(&thread->stats, skipped, latency, lag); + + /* count transactions over the latency limit, if needed */ + if (latency_limit && latency > latency_limit) + thread->latency_late++; + } + else + thread->stats.cnt++; + + if (use_log) + doLog(thread, st, now, agg, skipped, latency, lag); + + /* XXX could use a mutex here, but we choose not to */ + if (per_script_stats) + accumStats(&sql_script[st->use_file].stats, skipped, latency, lag); +} + + /* discard connections */ static void disconnect_all(CState *state, int length) @@ -2317,25 +2756,53 @@ parseQuery(Command *cmd, const char *raw_sql) return true; } +/* + * Simple error-printing function, might be needed by lexer + */ +static void +pgbench_error(const char *fmt,...) +{ + va_list ap; + + fflush(stdout); + va_start(ap, fmt); + vfprintf(stderr, _(fmt), ap); + va_end(ap); +} + +/* + * syntax error while parsing a script (in practice, while parsing a + * backslash command, because we don't detect syntax errors in SQL) + * + * source: source of script (filename or builtin-script ID) + * lineno: line number within script (count from 1) + * line: whole line of backslash command, if available + * command: backslash command name, if available + * msg: the actual error message + * more: optional extra message + * column: zero-based column number, or -1 if unknown + */ void -syntax_error(const char *source, const int lineno, +syntax_error(const char *source, int lineno, const char *line, const char *command, - const char *msg, const char *more, const int column) + const char *msg, const char *more, int column) { fprintf(stderr, "%s:%d: %s", source, lineno, msg); if (more != NULL) fprintf(stderr, " (%s)", more); - if (column != -1) - fprintf(stderr, " at column %d", column); - fprintf(stderr, " in command \"%s\"\n", command); + if (column >= 0 && line == NULL) + fprintf(stderr, " at column %d", column + 1); + if (command != NULL) + fprintf(stderr, " in command \"%s\"", command); + fprintf(stderr, "\n"); if (line != NULL) { fprintf(stderr, "%s\n", line); - if (column != -1) + if (column >= 0) { int i; - for (i = 0; i < column - 1; i++) + for (i = 0; i < column; i++) fprintf(stderr, " "); fprintf(stderr, "^ error found here\n"); } @@ -2343,408 +2810,562 @@ syntax_error(const char *source, const int lineno, exit(1); } -/* Parse a command; return a Command struct, or NULL if it's a comment */ +/* + * Parse a SQL command; return a Command struct, or NULL if it's a comment + * + * On entry, psqlscan.l has collected the command into "buf", so we don't + * really need to do much here except check for comment and set up a + * Command struct. + */ static Command * -process_commands(char *buf, const char *source, const int lineno) +process_sql_command(PQExpBuffer buf, const char *source) { - const char delim[] = " \f\n\r\t\v"; + Command *my_command; + char *p; + char *nlpos; + + /* Skip any leading whitespace, as well as "--" style comments */ + p = buf->data; + for (;;) + { + if (isspace((unsigned char) *p)) + p++; + else if (strncmp(p, "--", 2) == 0) + { + p = strchr(p, '\n'); + if (p == NULL) + return NULL; + p++; + } + else + break; + } + + /* If there's nothing but whitespace and comments, we're done */ + if (*p == '\0') + return NULL; + + /* Allocate and initialize Command structure */ + my_command = (Command *) pg_malloc0(sizeof(Command)); + my_command->command_num = num_commands++; + my_command->type = SQL_COMMAND; + my_command->argc = 0; + initSimpleStats(&my_command->stats); + + /* + * If SQL command is multi-line, we only want to save the first line as + * the "line" label. + */ + nlpos = strchr(p, '\n'); + if (nlpos) + { + my_command->line = pg_malloc(nlpos - p + 1); + memcpy(my_command->line, p, nlpos - p); + my_command->line[nlpos - p] = '\0'; + } + else + my_command->line = pg_strdup(p); + + switch (querymode) + { + case QUERY_SIMPLE: + my_command->argv[0] = pg_strdup(p); + my_command->argc++; + break; + case QUERY_EXTENDED: + case QUERY_PREPARED: + if (!parseQuery(my_command, p)) + exit(1); + break; + default: + exit(1); + } + + return my_command; +} - Command *my_commands; +/* + * Parse a backslash command; return a Command struct, or NULL if comment + * + * At call, we have scanned only the initial backslash. + */ +static Command * +process_backslash_command(PsqlScanState sstate, const char *source) +{ + Command *my_command; + PQExpBufferData word_buf; + int word_offset; + int offsets[MAX_ARGS]; /* offsets of argument words */ + int start_offset, + end_offset; + int lineno; int j; - char *p, - *tok; - /* Make the string buf end at the next newline */ - if ((p = strchr(buf, '\n')) != NULL) - *p = '\0'; + initPQExpBuffer(&word_buf); - /* Skip leading whitespace */ - p = buf; - while (isspace((unsigned char) *p)) - p++; + /* Remember location of the backslash */ + start_offset = expr_scanner_offset(sstate) - 1; + lineno = expr_scanner_get_lineno(sstate, start_offset); - /* If the line is empty or actually a comment, we're done */ - if (*p == '\0' || strncmp(p, "--", 2) == 0) + /* Collect first word of command */ + if (!expr_lex_one_word(sstate, &word_buf, &word_offset)) + { + termPQExpBuffer(&word_buf); return NULL; + } /* Allocate and initialize Command structure */ - my_commands = (Command *) pg_malloc(sizeof(Command)); - my_commands->line = pg_strdup(buf); - my_commands->command_num = num_commands++; - my_commands->type = 0; /* until set */ - my_commands->argc = 0; - - if (*p == '\\') + my_command = (Command *) pg_malloc0(sizeof(Command)); + my_command->command_num = num_commands++; + my_command->type = META_COMMAND; + my_command->argc = 0; + initSimpleStats(&my_command->stats); + + /* Save first word (command name) */ + j = 0; + offsets[j] = word_offset; + my_command->argv[j++] = pg_strdup(word_buf.data); + my_command->argc++; + + if (pg_strcasecmp(my_command->argv[0], "set") == 0) { - int max_args = -1; + /* For \set, collect var name, then lex the expression. */ + yyscan_t yyscanner; - my_commands->type = META_COMMAND; + if (!expr_lex_one_word(sstate, &word_buf, &word_offset)) + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "missing argument", NULL, -1); - j = 0; - tok = strtok(++p, delim); + offsets[j] = word_offset; + my_command->argv[j++] = pg_strdup(word_buf.data); + my_command->argc++; - if (tok != NULL && pg_strcasecmp(tok, "set") == 0) - max_args = 2; + yyscanner = expr_scanner_init(sstate, source, lineno, start_offset, + my_command->argv[0]); - while (tok != NULL) + if (expr_yyparse(yyscanner) != 0) { - my_commands->cols[j] = tok - buf + 1; - my_commands->argv[j++] = pg_strdup(tok); - my_commands->argc++; - if (max_args >= 0 && my_commands->argc >= max_args) - tok = strtok(NULL, ""); - else - tok = strtok(NULL, delim); + /* dead code: exit done from syntax_error called by yyerror */ + exit(1); } - if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0) - { - /* - * parsing: \setrandom variable min max [uniform] \setrandom - * variable min max (gaussian|exponential) threshold - */ + my_command->expr = expr_parse_result; - if (my_commands->argc < 4) - { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "missing arguments", NULL, -1); - } + /* Get location of the ending newline */ + end_offset = expr_scanner_offset(sstate) - 1; - /* argc >= 4 */ + /* Save line */ + my_command->line = expr_scanner_get_substring(sstate, + start_offset, + end_offset); - if (my_commands->argc == 4 || /* uniform without/with - * "uniform" keyword */ - (my_commands->argc == 5 && - pg_strcasecmp(my_commands->argv[4], "uniform") == 0)) - { - /* nothing to do */ - } - else if ( /* argc >= 5 */ - (pg_strcasecmp(my_commands->argv[4], "gaussian") == 0) || - (pg_strcasecmp(my_commands->argv[4], "exponential") == 0)) - { - if (my_commands->argc < 6) - { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "missing threshold argument", my_commands->argv[4], -1); - } - else if (my_commands->argc > 6) - { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "too many arguments", my_commands->argv[4], - my_commands->cols[6]); - } - } - else /* cannot parse, unexpected arguments */ + expr_scanner_finish(yyscanner); + + termPQExpBuffer(&word_buf); + + return my_command; + } + + /* For all other commands, collect remaining words. */ + while (expr_lex_one_word(sstate, &word_buf, &word_offset)) + { + if (j >= MAX_ARGS) + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "too many arguments", NULL, -1); + + offsets[j] = word_offset; + my_command->argv[j++] = pg_strdup(word_buf.data); + my_command->argc++; + } + + /* Get location of the ending newline */ + end_offset = expr_scanner_offset(sstate) - 1; + + /* Save line */ + my_command->line = expr_scanner_get_substring(sstate, + start_offset, + end_offset); + + if (pg_strcasecmp(my_command->argv[0], "sleep") == 0) + { + if (my_command->argc < 2) + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "missing argument", NULL, -1); + + if (my_command->argc > 3) + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "too many arguments", NULL, + offsets[3] - start_offset); + + /* + * Split argument into number and unit to allow "sleep 1ms" etc. We + * don't have to terminate the number argument with null because it + * will be parsed with atoi, which ignores trailing non-digit + * characters. + */ + if (my_command->argc == 2 && my_command->argv[1][0] != ':') + { + char *c = my_command->argv[1]; + + while (isdigit((unsigned char) *c)) + c++; + if (*c) { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "unexpected argument", my_commands->argv[4], - my_commands->cols[4]); + my_command->argv[2] = c; + offsets[2] = offsets[1] + (c - my_command->argv[1]); + my_command->argc = 3; } } - else if (pg_strcasecmp(my_commands->argv[0], "set") == 0) + + if (my_command->argc == 3) { - if (my_commands->argc < 3) - { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "missing argument", NULL, -1); - } + if (pg_strcasecmp(my_command->argv[2], "us") != 0 && + pg_strcasecmp(my_command->argv[2], "ms") != 0 && + pg_strcasecmp(my_command->argv[2], "s") != 0) + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "unrecognized time unit, must be us, ms or s", + my_command->argv[2], offsets[2] - start_offset); + } + } + else if (pg_strcasecmp(my_command->argv[0], "setshell") == 0) + { + if (my_command->argc < 3) + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "missing argument", NULL, -1); + } + else if (pg_strcasecmp(my_command->argv[0], "shell") == 0) + { + if (my_command->argc < 2) + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "missing command", NULL, -1); + } + else + { + syntax_error(source, lineno, my_command->line, my_command->argv[0], + "invalid command", NULL, -1); + } - expr_scanner_init(my_commands->argv[2], source, lineno, - my_commands->line, my_commands->argv[0], - my_commands->cols[2] - 1); + termPQExpBuffer(&word_buf); - if (expr_yyparse() != 0) - { - /* dead code: exit done from syntax_error called by yyerror */ - exit(1); - } + return my_command; +} + +/* + * Parse a script (either the contents of a file, or a built-in script) + * and add it to the list of scripts. + */ +static void +ParseScript(const char *script, const char *desc, int weight) +{ + ParsedScript ps; + PsqlScanState sstate; + PQExpBufferData line_buf; + int alloc_num; + int index; - my_commands->expr = expr_parse_result; +#define COMMANDS_ALLOC_NUM 128 + alloc_num = COMMANDS_ALLOC_NUM; - expr_scanner_finish(); - } - else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0) - { - if (my_commands->argc < 2) - { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "missing argument", NULL, -1); - } + /* Initialize all fields of ps */ + ps.desc = desc; + ps.weight = weight; + ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num); + initStats(&ps.stats, 0.0); - /* - * Split argument into number and unit to allow "sleep 1ms" etc. - * We don't have to terminate the number argument with null - * because it will be parsed with atoi, which ignores trailing - * non-digit characters. - */ - if (my_commands->argv[1][0] != ':') - { - char *c = my_commands->argv[1]; + /* Prepare to parse script */ + sstate = psql_scan_create(&pgbench_callbacks); - while (isdigit((unsigned char) *c)) - c++; - if (*c) - { - my_commands->argv[2] = c; - if (my_commands->argc < 3) - my_commands->argc = 3; - } - } + /* + * Ideally, we'd scan scripts using the encoding and stdstrings settings + * we get from a DB connection. However, without major rearrangement of + * pgbench's argument parsing, we can't have a DB connection at the time + * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough + * with any backend-safe encoding, though conceivably we could be fooled + * if a script file uses a client-only encoding. We also assume that + * stdstrings should be true, which is a bit riskier. + */ + psql_scan_setup(sstate, script, strlen(script), 0, true); - if (my_commands->argc >= 3) - { - if (pg_strcasecmp(my_commands->argv[2], "us") != 0 && - pg_strcasecmp(my_commands->argv[2], "ms") != 0 && - pg_strcasecmp(my_commands->argv[2], "s") != 0) - { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "unknown time unit, must be us, ms or s", - my_commands->argv[2], my_commands->cols[2]); - } - } + initPQExpBuffer(&line_buf); - /* this should be an error?! */ - for (j = 3; j < my_commands->argc; j++) - fprintf(stderr, "%s: extra argument \"%s\" ignored\n", - my_commands->argv[0], my_commands->argv[j]); - } - else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0) + index = 0; + + for (;;) + { + PsqlScanResult sr; + promptStatus_t prompt; + Command *command; + + resetPQExpBuffer(&line_buf); + + sr = psql_scan(sstate, &line_buf, &prompt); + + /* If we collected a SQL command, process that */ + command = process_sql_command(&line_buf, desc); + if (command) { - if (my_commands->argc < 3) + ps.commands[index] = command; + index++; + + if (index >= alloc_num) { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "missing argument", NULL, -1); + alloc_num += COMMANDS_ALLOC_NUM; + ps.commands = (Command **) + pg_realloc(ps.commands, sizeof(Command *) * alloc_num); } } - else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0) + + /* If we reached a backslash, process that */ + if (sr == PSCAN_BACKSLASH) { - if (my_commands->argc < 1) + command = process_backslash_command(sstate, desc); + if (command) { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "missing command", NULL, -1); + ps.commands[index] = command; + index++; + + if (index >= alloc_num) + { + alloc_num += COMMANDS_ALLOC_NUM; + ps.commands = (Command **) + pg_realloc(ps.commands, sizeof(Command *) * alloc_num); + } } } - else - { - syntax_error(source, lineno, my_commands->line, my_commands->argv[0], - "invalid command", NULL, -1); - } - } - else - { - my_commands->type = SQL_COMMAND; - switch (querymode) - { - case QUERY_SIMPLE: - my_commands->argv[0] = pg_strdup(p); - my_commands->argc++; - break; - case QUERY_EXTENDED: - case QUERY_PREPARED: - if (!parseQuery(my_commands, p)) - exit(1); - break; - default: - exit(1); - } + /* Done if we reached EOF */ + if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL) + break; } - return my_commands; + ps.commands[index] = NULL; + + addScript(ps); + + termPQExpBuffer(&line_buf); + psql_scan_finish(sstate); + psql_scan_destroy(sstate); } /* - * Read a line from fd, and return it in a malloc'd buffer. - * Return NULL at EOF. + * Read the entire contents of file fd, and return it in a malloc'd buffer. * * The buffer will typically be larger than necessary, but we don't care - * in this program, because we'll free it as soon as we've parsed the line. + * in this program, because we'll free it as soon as we've parsed the script. */ static char * -read_line_from_file(FILE *fd) +read_file_contents(FILE *fd) { - char tmpbuf[BUFSIZ]; char *buf; size_t buflen = BUFSIZ; size_t used = 0; - buf = (char *) palloc(buflen); - buf[0] = '\0'; + buf = (char *) pg_malloc(buflen); - while (fgets(tmpbuf, BUFSIZ, fd) != NULL) + for (;;) { - size_t thislen = strlen(tmpbuf); + size_t nread; - /* Append tmpbuf to whatever we had already */ - memcpy(buf + used, tmpbuf, thislen + 1); - used += thislen; - - /* Done if we collected a newline */ - if (thislen > 0 && tmpbuf[thislen - 1] == '\n') + nread = fread(buf + used, 1, BUFSIZ, fd); + used += nread; + /* If fread() read less than requested, must be EOF or error */ + if (nread < BUFSIZ) break; - - /* Else, enlarge buf to ensure we can append next bufferload */ + /* Enlarge buf so we can read some more */ buflen += BUFSIZ; buf = (char *) pg_realloc(buf, buflen); } + /* There is surely room for a terminator */ + buf[used] = '\0'; - if (used > 0) - return buf; - - /* Reached EOF */ - free(buf); - return NULL; + return buf; } -static int -process_file(char *filename) +/* + * Given a file name, read it and add its script to the list. + * "-" means to read stdin. + * NB: filename must be storage that won't disappear. + */ +static void +process_file(const char *filename, int weight) { -#define COMMANDS_ALLOC_NUM 128 - - Command **my_commands; FILE *fd; - int lineno, - index; char *buf; - int alloc_num; - if (num_files >= MAX_FILES) + /* Slurp the file contents into "buf" */ + if (strcmp(filename, "-") == 0) + fd = stdin; + else if ((fd = fopen(filename, "r")) == NULL) { - fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES); + fprintf(stderr, "could not open file \"%s\": %s\n", + filename, strerror(errno)); exit(1); } - alloc_num = COMMANDS_ALLOC_NUM; - my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num); + buf = read_file_contents(fd); - if (strcmp(filename, "-") == 0) - fd = stdin; - else if ((fd = fopen(filename, "r")) == NULL) + if (ferror(fd)) { - fprintf(stderr, "%s: %s\n", filename, strerror(errno)); - pg_free(my_commands); - return false; + fprintf(stderr, "could not read file \"%s\": %s\n", + filename, strerror(errno)); + exit(1); } - lineno = 0; - index = 0; + if (fd != stdin) + fclose(fd); - while ((buf = read_line_from_file(fd)) != NULL) - { - Command *command; + ParseScript(buf, filename, weight); - lineno += 1; + free(buf); +} - command = process_commands(buf, filename, lineno); +/* Parse the given builtin script and add it to the list. */ +static void +process_builtin(const BuiltinScript *bi, int weight) +{ + ParseScript(bi->script, bi->desc, weight); +} - free(buf); +/* show available builtin scripts */ +static void +listAvailableScripts(void) +{ + int i; - if (command == NULL) - continue; + fprintf(stderr, "Available builtin scripts:\n"); + for (i = 0; i < lengthof(builtin_script); i++) + fprintf(stderr, "\t%s\n", builtin_script[i].name); + fprintf(stderr, "\n"); +} - my_commands[index] = command; - index++; +/* return builtin script "name" if unambiguous, fails if not found */ +static const BuiltinScript * +findBuiltin(const char *name) +{ + int i, + found = 0, + len = strlen(name); + const BuiltinScript *result = NULL; - if (index >= alloc_num) + for (i = 0; i < lengthof(builtin_script); i++) + { + if (strncmp(builtin_script[i].name, name, len) == 0) { - alloc_num += COMMANDS_ALLOC_NUM; - my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num); + result = &builtin_script[i]; + found++; } } - fclose(fd); - my_commands[index] = NULL; + /* ok, unambiguous result */ + if (found == 1) + return result; - sql_files[num_files++] = my_commands; + /* error cases */ + if (found == 0) + fprintf(stderr, "no builtin script found for name \"%s\"\n", name); + else /* found > 1 */ + fprintf(stderr, + "ambiguous builtin name: %d builtin scripts found for prefix \"%s\"\n", found, name); - return true; + listAvailableScripts(); + exit(1); } -static Command ** -process_builtin(char *tb, const char *source) +/* + * Determine the weight specification from a script option (-b, -f), if any, + * and return it as an integer (1 is returned if there's no weight). The + * script name is returned in *script as a malloc'd string. + */ +static int +parseScriptWeight(const char *option, char **script) { -#define COMMANDS_ALLOC_NUM 128 + char *sep; + int weight; - Command **my_commands; - int lineno, - index; - char buf[BUFSIZ]; - int alloc_num; - - alloc_num = COMMANDS_ALLOC_NUM; - my_commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num); - - lineno = 0; - index = 0; - - for (;;) + if ((sep = strrchr(option, WSEP))) { - char *p; - Command *command; - - p = buf; - while (*tb && *tb != '\n') - *p++ = *tb++; - - if (*tb == '\0') - break; - - if (*tb == '\n') - tb++; - - *p = '\0'; - - lineno += 1; + int namelen = sep - option; + long wtmp; + char *badp; + + /* generate the script name */ + *script = pg_malloc(namelen + 1); + strncpy(*script, option, namelen); + (*script)[namelen] = '\0'; + + /* process digits of the weight spec */ + errno = 0; + wtmp = strtol(sep + 1, &badp, 10); + if (errno != 0 || badp == sep + 1 || *badp != '\0') + { + fprintf(stderr, "invalid weight specification: %s\n", sep); + exit(1); + } + if (wtmp > INT_MAX || wtmp < 0) + { + fprintf(stderr, + "weight specification out of range (0 .. %u): " INT64_FORMAT "\n", + INT_MAX, (int64) wtmp); + exit(1); + } + weight = wtmp; + } + else + { + *script = pg_strdup(option); + weight = 1; + } - command = process_commands(buf, source, lineno); - if (command == NULL) - continue; + return weight; +} - my_commands[index] = command; - index++; +/* append a script to the list of scripts to process */ +static void +addScript(ParsedScript script) +{ + if (script.commands == NULL || script.commands[0] == NULL) + { + fprintf(stderr, "empty command list for script \"%s\"\n", script.desc); + exit(1); + } - if (index >= alloc_num) - { - alloc_num += COMMANDS_ALLOC_NUM; - my_commands = pg_realloc(my_commands, sizeof(Command *) * alloc_num); - } + if (num_scripts >= MAX_SCRIPTS) + { + fprintf(stderr, "at most %d SQL scripts are allowed\n", MAX_SCRIPTS); + exit(1); } - my_commands[index] = NULL; + sql_script[num_scripts] = script; + num_scripts++; +} + +static void +printSimpleStats(char *prefix, SimpleStats *ss) +{ + /* print NaN if no transactions where executed */ + double latency = ss->sum / ss->count; + double stddev = sqrt(ss->sum2 / ss->count - latency * latency); - return my_commands; + printf("%s average = %.3f ms\n", prefix, 0.001 * latency); + printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev); } /* print out results */ static void -printResults(int ttype, int64 normal_xacts, int nclients, - TState *threads, int nthreads, - instr_time total_time, instr_time conn_total_time, - int64 total_latencies, int64 total_sqlats, - int64 throttle_lag, int64 throttle_lag_max, - int64 throttle_latency_skipped, int64 latency_late) +printResults(TState *threads, StatsData *total, instr_time total_time, + instr_time conn_total_time, int latency_late) { double time_include, tps_include, tps_exclude; - char *s; time_include = INSTR_TIME_GET_DOUBLE(total_time); - tps_include = normal_xacts / time_include; - tps_exclude = normal_xacts / (time_include - - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads)); - - if (ttype == 0) - s = "TPC-B (sort of)"; - else if (ttype == 2) - s = "Update only pgbench_accounts"; - else if (ttype == 1) - s = "SELECT only"; - else - s = "Custom query"; + tps_include = total->cnt / time_include; + tps_exclude = total->cnt / (time_include - + (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients)); - printf("transaction type: %s\n", s); + printf("transaction type: %s\n", + num_scripts == 1 ? sql_script[0].desc : "multiple scripts"); printf("scaling factor: %d\n", scale); printf("query mode: %s\n", QUERYMODE[querymode]); printf("number of clients: %d\n", nclients); @@ -2752,46 +3373,36 @@ printResults(int ttype, int64 normal_xacts, int nclients, if (duration <= 0) { printf("number of transactions per client: %d\n", nxacts); - printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n", - normal_xacts, (int64) nxacts * nclients); + printf("number of transactions actually processed: " INT64_FORMAT "/%d\n", + total->cnt, nxacts * nclients); } else { printf("duration: %d s\n", duration); printf("number of transactions actually processed: " INT64_FORMAT "\n", - normal_xacts); + total->cnt); } /* Remaining stats are nonsensical if we failed to execute any xacts */ - if (normal_xacts <= 0) + if (total->cnt <= 0) return; if (throttle_delay && latency_limit) printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n", - throttle_latency_skipped, - 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts)); + total->skipped, + 100.0 * total->skipped / (total->skipped + total->cnt)); if (latency_limit) - printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n", + printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n", latency_limit / 1000.0, latency_late, - 100.0 * latency_late / (throttle_latency_skipped + normal_xacts)); + 100.0 * latency_late / (total->skipped + total->cnt)); if (throttle_delay || progress || latency_limit) - { - /* compute and show latency average and standard deviation */ - double latency = 0.001 * total_latencies / normal_xacts; - double sqlat = (double) total_sqlats / normal_xacts; - - printf("latency average: %.3f ms\n" - "latency stddev: %.3f ms\n", - latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency)); - } + printSimpleStats("latency", &total->latency); else - { /* only an average latency computed from the duration is available */ printf("latency average: %.3f ms\n", - 1000.0 * duration * nclients / normal_xacts); - } + 1000.0 * duration * nclients / total->cnt); if (throttle_delay) { @@ -2802,53 +3413,55 @@ printResults(int ttype, int64 normal_xacts, int nclients, * the database load, or the Poisson throttling process. */ printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n", - 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max); + 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max); } printf("tps = %f (including connections establishing)\n", tps_include); printf("tps = %f (excluding connections establishing)\n", tps_exclude); - /* Report per-command latencies */ - if (is_latencies) + /* Report per-script/command statistics */ + if (per_script_stats || latency_limit || is_latencies) { int i; - for (i = 0; i < num_files; i++) + for (i = 0; i < num_scripts; i++) { - Command **commands; - - if (num_files > 1) - printf("statement latencies in milliseconds, file %d:\n", i + 1); + if (num_scripts > 1) + printf("SQL script %d: %s\n" + " - weight = %d (targets %.1f%% of total)\n" + " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n", + i + 1, sql_script[i].desc, + sql_script[i].weight, + 100.0 * sql_script[i].weight / total_weight, + sql_script[i].stats.cnt, + 100.0 * sql_script[i].stats.cnt / total->cnt, + sql_script[i].stats.cnt / time_include); else - printf("statement latencies in milliseconds:\n"); + printf("script statistics:\n"); - for (commands = sql_files[i]; *commands != NULL; commands++) - { - Command *command = *commands; - int cnum = command->command_num; - double total_time; - instr_time total_exec_elapsed; - int total_exec_count; - int t; - - /* Accumulate per-thread data for command */ - INSTR_TIME_SET_ZERO(total_exec_elapsed); - total_exec_count = 0; - for (t = 0; t < nthreads; t++) - { - TState *thread = &threads[t]; + if (latency_limit) + printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n", + sql_script[i].stats.skipped, + 100.0 * sql_script[i].stats.skipped / + (sql_script[i].stats.skipped + sql_script[i].stats.cnt)); - INSTR_TIME_ADD(total_exec_elapsed, - thread->exec_elapsed[cnum]); - total_exec_count += thread->exec_count[cnum]; - } + if (num_scripts > 1) + printSimpleStats(" - latency", &sql_script[i].stats.latency); - if (total_exec_count > 0) - total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count; - else - total_time = 0.0; + /* Report per-command latencies */ + if (is_latencies) + { + Command **commands; - printf("\t%f\t%s\n", total_time, command->line); + printf(" - statement latencies in milliseconds:\n"); + + for (commands = sql_script[i].commands; + *commands != NULL; + commands++) + printf(" %11.3f %s\n", + 1000.0 * (*commands)->stats.sum / + (*commands)->stats.count, + (*commands)->line); } } } @@ -2860,6 +3473,7 @@ main(int argc, char **argv) { static struct option long_options[] = { /* systematic long/short named options */ + {"tpc-b", no_argument, NULL, 'b'}, {"client", required_argument, NULL, 'c'}, {"connect", no_argument, NULL, 'C'}, {"debug", no_argument, NULL, 'd'}, @@ -2870,12 +3484,14 @@ main(int argc, char **argv) {"initialize", no_argument, NULL, 'i'}, {"jobs", required_argument, NULL, 'j'}, {"log", no_argument, NULL, 'l'}, + {"latency-limit", required_argument, NULL, 'L'}, {"no-vacuum", no_argument, NULL, 'n'}, {"port", required_argument, NULL, 'p'}, {"progress", required_argument, NULL, 'P'}, {"protocol", required_argument, NULL, 'M'}, {"quiet", no_argument, NULL, 'q'}, {"report-latencies", no_argument, NULL, 'r'}, + {"rate", required_argument, NULL, 'R'}, {"scale", required_argument, NULL, 's'}, {"select-only", no_argument, NULL, 'S'}, {"skip-some-updates", no_argument, NULL, 'N'}, @@ -2890,25 +3506,20 @@ main(int argc, char **argv) {"unlogged-tables", no_argument, &unlogged_tables, 1}, {"sampling-rate", required_argument, NULL, 4}, {"aggregate-interval", required_argument, NULL, 5}, - {"rate", required_argument, NULL, 'R'}, - {"latency-limit", required_argument, NULL, 'L'}, + {"progress-timestamp", no_argument, NULL, 6}, {NULL, 0, NULL, 0} }; int c; - int nclients = 1; /* default number of simulated clients */ - int nthreads = 1; /* default number of threads */ int is_init_mode = 0; /* initialize mode? */ int is_no_vacuum = 0; /* no vacuum at all before testing? */ int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ - int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only, - * 2: skip update of branches and tellers */ int optindex; - char *filename = NULL; bool scale_given = false; bool benchmarking_option_set = false; bool initialization_option_set = false; + bool internal_script_used = false; CState *state; /* status of clients */ TState *threads; /* array of thread */ @@ -2916,15 +3527,12 @@ main(int argc, char **argv) instr_time start_time; /* start up time */ instr_time total_time; instr_time conn_total_time; - int64 total_xacts = 0; - int64 total_latencies = 0; - int64 total_sqlats = 0; - int64 throttle_lag = 0; - int64 throttle_lag_max = 0; - int64 throttle_latency_skipped = 0; int64 latency_late = 0; + StatsData stats; + int weight; int i; + int nclients_dealt; #ifdef HAVE_GETRLIMIT struct rlimit rlim; @@ -2934,8 +3542,6 @@ main(int argc, char **argv) PGresult *res; char *env; - char val[64]; - progname = get_progname(argv[0]); if (argc > 1) @@ -2968,11 +3574,13 @@ main(int argc, char **argv) memset(state, 0, sizeof(CState)); #ifdef PGXC - while ((c = getopt_long(argc, argv, "ih:knvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "ih:knvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1) #else - while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1) + while ((c = getopt_long(argc, argv, "ih:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1) #endif { + char *script; + switch (c) { case 'i': @@ -2998,20 +3606,13 @@ main(int argc, char **argv) case 'd': debug++; break; - case 'S': - ttype = 1; - benchmarking_option_set = true; - break; - case 'N': - ttype = 2; - benchmarking_option_set = true; - break; case 'c': benchmarking_option_set = true; nclients = atoi(optarg); if (nclients <= 0 || nclients > MAXCLIENTS) { - fprintf(stderr, "invalid number of clients: %d\n", nclients); + fprintf(stderr, "invalid number of clients: \"%s\"\n", + optarg); exit(1); } #ifdef HAVE_GETRLIMIT @@ -3024,10 +3625,11 @@ main(int argc, char **argv) fprintf(stderr, "getrlimit failed: %s\n", strerror(errno)); exit(1); } - if (rlim.rlim_cur <= (nclients + 2)) + if (rlim.rlim_cur < nclients + 3) { - fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur); - fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n"); + fprintf(stderr, "need at least %d open files, but system limit is %ld\n", + nclients + 3, (long) rlim.rlim_cur); + fprintf(stderr, "Reduce number of clients, or use limit/ulimit to increase the system limit.\n"); exit(1); } #endif /* HAVE_GETRLIMIT */ @@ -3037,9 +3639,17 @@ main(int argc, char **argv) nthreads = atoi(optarg); if (nthreads <= 0) { - fprintf(stderr, "invalid number of threads: %d\n", nthreads); + fprintf(stderr, "invalid number of threads: \"%s\"\n", + optarg); + exit(1); + } +#ifndef ENABLE_THREAD_SAFETY + if (nthreads != 1) + { + fprintf(stderr, "threads are not supported on this platform; use -j1\n"); exit(1); } +#endif /* !ENABLE_THREAD_SAFETY */ break; case 'C': benchmarking_option_set = true; @@ -3047,6 +3657,7 @@ main(int argc, char **argv) break; case 'r': benchmarking_option_set = true; + per_script_stats = true; is_latencies = true; break; case 's': @@ -3054,7 +3665,7 @@ main(int argc, char **argv) scale = atoi(optarg); if (scale <= 0) { - fprintf(stderr, "invalid scaling factor: %d\n", scale); + fprintf(stderr, "invalid scaling factor: \"%s\"\n", optarg); exit(1); } break; @@ -3062,13 +3673,14 @@ main(int argc, char **argv) benchmarking_option_set = true; if (duration > 0) { - fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n"); + fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n"); exit(1); } nxacts = atoi(optarg); if (nxacts <= 0) { - fprintf(stderr, "invalid number of transactions: %d\n", nxacts); + fprintf(stderr, "invalid number of transactions: \"%s\"\n", + optarg); exit(1); } break; @@ -3076,13 +3688,13 @@ main(int argc, char **argv) benchmarking_option_set = true; if (nxacts > 0) { - fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n"); + fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both\n"); exit(1); } duration = atoi(optarg); if (duration <= 0) { - fprintf(stderr, "invalid duration: %d\n", duration); + fprintf(stderr, "invalid duration: \"%s\"\n", optarg); exit(1); } break; @@ -3097,12 +3709,34 @@ main(int argc, char **argv) initialization_option_set = true; use_quiet = true; break; + + case 'b': + if (strcmp(optarg, "list") == 0) + { + listAvailableScripts(); + exit(0); + } + + weight = parseScriptWeight(optarg, &script); + process_builtin(findBuiltin(script), weight); + benchmarking_option_set = true; + internal_script_used = true; + break; + + case 'S': + process_builtin(findBuiltin("select-only"), 1); + benchmarking_option_set = true; + internal_script_used = true; + break; + case 'N': + process_builtin(findBuiltin("simple-update"), 1); + benchmarking_option_set = true; + internal_script_used = true; + break; case 'f': + weight = parseScriptWeight(optarg, &script); + process_file(script, weight); benchmarking_option_set = true; - ttype = 3; - filename = pg_strdup(optarg); - if (process_file(filename) == false || *sql_files[num_files - 1] == NULL) - exit(1); break; case 'D': { @@ -3112,7 +3746,8 @@ main(int argc, char **argv) if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0') { - fprintf(stderr, "invalid variable definition: %s\n", optarg); + fprintf(stderr, "invalid variable definition: \"%s\"\n", + optarg); exit(1); } @@ -3124,17 +3759,17 @@ main(int argc, char **argv) case 'F': initialization_option_set = true; fillfactor = atoi(optarg); - if ((fillfactor < 10) || (fillfactor > 100)) + if (fillfactor < 10 || fillfactor > 100) { - fprintf(stderr, "invalid fillfactor: %d\n", fillfactor); + fprintf(stderr, "invalid fillfactor: \"%s\"\n", optarg); exit(1); } break; case 'M': benchmarking_option_set = true; - if (num_files > 0) + if (num_scripts > 0) { - fprintf(stderr, "query mode (-M) should be specified before transaction scripts (-f)\n"); + fprintf(stderr, "query mode (-M) should be specified before any transaction scripts (-f or -b)\n"); exit(1); } for (querymode = 0; querymode < NUM_QUERYMODE; querymode++) @@ -3142,7 +3777,8 @@ main(int argc, char **argv) break; if (querymode >= NUM_QUERYMODE) { - fprintf(stderr, "invalid query mode (-M): %s\n", optarg); + fprintf(stderr, "invalid query mode (-M): \"%s\"\n", + optarg); exit(1); } break; @@ -3151,8 +3787,7 @@ main(int argc, char **argv) progress = atoi(optarg); if (progress <= 0) { - fprintf(stderr, - "thread progress delay (-P) must be positive (%s)\n", + fprintf(stderr, "invalid thread progress delay: \"%s\"\n", optarg); exit(1); } @@ -3166,7 +3801,7 @@ main(int argc, char **argv) if (throttle_value <= 0.0) { - fprintf(stderr, "invalid rate limit: %s\n", optarg); + fprintf(stderr, "invalid rate limit: \"%s\"\n", optarg); exit(1); } /* Invert rate limit into a time offset */ @@ -3179,7 +3814,8 @@ main(int argc, char **argv) if (limit_ms <= 0.0) { - fprintf(stderr, "invalid latency limit: %s\n", optarg); + fprintf(stderr, "invalid latency limit: \"%s\"\n", + optarg); exit(1); } benchmarking_option_set = true; @@ -3204,24 +3840,29 @@ main(int argc, char **argv) sample_rate = atof(optarg); if (sample_rate <= 0.0 || sample_rate > 1.0) { - fprintf(stderr, "invalid sampling rate: %f\n", sample_rate); + fprintf(stderr, "invalid sampling rate: \"%s\"\n", optarg); exit(1); } break; case 5: #ifdef WIN32 - fprintf(stderr, "--aggregate-interval is not currently supported on Windows"); + fprintf(stderr, "--aggregate-interval is not currently supported on Windows\n"); exit(1); #else benchmarking_option_set = true; agg_interval = atoi(optarg); if (agg_interval <= 0) { - fprintf(stderr, "invalid number of seconds for aggregation: %d\n", agg_interval); + fprintf(stderr, "invalid number of seconds for aggregation: \"%s\"\n", + optarg); exit(1); } #endif break; + case 6: + progress_timestamp = true; + benchmarking_option_set = true; + break; default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); @@ -3229,6 +3870,37 @@ main(int argc, char **argv) } } + /* set default script if none */ + if (num_scripts == 0 && !is_init_mode) + { + process_builtin(findBuiltin("tpcb-like"), 1); + benchmarking_option_set = true; + internal_script_used = true; + } + + /* compute total_weight */ + for (i = 0; i < num_scripts; i++) + /* cannot overflow: weight is 32b, total_weight 64b */ + total_weight += sql_script[i].weight; + + if (total_weight == 0 && !is_init_mode) + { + fprintf(stderr, "total script weight must not be zero\n"); + exit(1); + } + + /* show per script stats if several scripts are used */ + if (num_scripts > 1) + per_script_stats = true; + + /* + * Don't need more threads than there are clients. (This is not merely an + * optimization; throttle_delay is calculated incorrectly below if some + * threads have no clients assigned to them.) + */ + if (nthreads > nclients) + nthreads = nclients; + /* compute a per thread delay */ throttle_delay *= nthreads; @@ -3248,7 +3920,7 @@ main(int argc, char **argv) { if (benchmarking_option_set) { - fprintf(stderr, "some options cannot be used in initialization (-i) mode\n"); + fprintf(stderr, "some of the specified options cannot be used in initialization (-i) mode\n"); exit(1); } @@ -3259,7 +3931,7 @@ main(int argc, char **argv) { if (initialization_option_set) { - fprintf(stderr, "some options cannot be used in benchmarking mode\n"); + fprintf(stderr, "some of the specified options cannot be used in benchmarking mode\n"); exit(1); } } @@ -3268,67 +3940,43 @@ main(int argc, char **argv) if (nxacts <= 0 && duration <= 0) nxacts = DEFAULT_NXACTS; - if (nclients % nthreads != 0) - { - fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads); - exit(1); - } - /* --sampling-rate may be used only with -l */ if (sample_rate > 0.0 && !use_log) { - fprintf(stderr, "log sampling rate is allowed only when logging transactions (-l) \n"); + fprintf(stderr, "log sampling (--sampling-rate) is allowed only when logging transactions (-l)\n"); exit(1); } - /* --sampling-rate may must not be used with --aggregate-interval */ + /* --sampling-rate may not be used with --aggregate-interval */ if (sample_rate > 0.0 && agg_interval > 0) { - fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) can't be used at the same time\n"); + fprintf(stderr, "log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time\n"); exit(1); } - if (agg_interval > 0 && (!use_log)) + if (agg_interval > 0 && !use_log) { fprintf(stderr, "log aggregation is allowed only when actually logging transactions\n"); exit(1); } - if ((duration > 0) && (agg_interval > duration)) + if (duration > 0 && agg_interval > duration) { - fprintf(stderr, "number of seconds for aggregation (%d) must not be higher that test duration (%d)\n", agg_interval, duration); + fprintf(stderr, "number of seconds for aggregation (%d) must not be higher than test duration (%d)\n", agg_interval, duration); exit(1); } - if ((duration > 0) && (agg_interval > 0) && (duration % agg_interval != 0)) + if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0) { fprintf(stderr, "duration (%d) must be a multiple of aggregation interval (%d)\n", duration, agg_interval); exit(1); } /* - * is_latencies only works with multiple threads in thread-based - * implementations, not fork-based ones, because it supposes that the - * parent can see changes made to the per-thread execution stats by child - * threads. It seems useful enough to accept despite this limitation, but - * perhaps we should FIXME someday (by passing the stats data back up - * through the parent-to-child pipes). - */ -#ifndef ENABLE_THREAD_SAFETY - if (is_latencies && nthreads > 1) - { - fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n"); - exit(1); - } -#endif - - /* * save main process id in the global variable because process id will be * changed after fork. */ main_pid = (int) getpid(); - progress_nclients = nclients; - progress_nthreads = nthreads; if (nclients > 1) { @@ -3343,8 +3991,20 @@ main(int argc, char **argv) state[i].id = i; for (j = 0; j < state[0].nvariables; j++) { - if (!putVariable(&state[i], "startup", state[0].variables[j].name, state[0].variables[j].value)) - exit(1); + Variable *var = &state[0].variables[j]; + + if (var->is_numeric) + { + if (!putVariableNumber(&state[i], "startup", + var->name, &var->num_value)) + exit(1); + } + else + { + if (!putVariable(&state[i], "startup", + var->name, var->value)) + exit(1); + } } } } @@ -3366,12 +4026,12 @@ main(int argc, char **argv) if (PQstatus(con) == CONNECTION_BAD) { - fprintf(stderr, "Connection to database '%s' failed.\n", dbName); + fprintf(stderr, "connection to database \"%s\" failed\n", dbName); fprintf(stderr, "%s", PQerrorMessage(con)); exit(1); } - if (ttype != 3) + if (internal_script_used) { /* * get the scaling factor that should be same as count(*) from @@ -3380,13 +4040,21 @@ main(int argc, char **argv) res = PQexec(con, "select count(*) from pgbench_branches"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { + char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE); + fprintf(stderr, "%s", PQerrorMessage(con)); + if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0) + { + fprintf(stderr, "Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\"\n", PQdb(con)); + } + exit(1); } scale = atoi(PQgetvalue(res, 0, 0)); if (scale < 0) { - fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale); + fprintf(stderr, "invalid count(*) from pgbench_branches: \"%s\"\n", + PQgetvalue(res, 0, 0)); exit(1); } PQclear(res); @@ -3394,7 +4062,7 @@ main(int argc, char **argv) /* warn if we override user-given -s switch */ if (scale_given) fprintf(stderr, - "Scale option ignored, using pgbench_branches table count = %d\n", + "scale option ignored, using count from pgbench_branches table (%d)\n", scale); } @@ -3402,12 +4070,11 @@ main(int argc, char **argv) * :scale variables normally get -s or database scale, but don't override * an explicit -D switch */ - if (getVariable(&state[0], "scale") == NULL) + if (lookupVariable(&state[0], "scale") == NULL) { - snprintf(val, sizeof(val), "%d", scale); for (i = 0; i < nclients; i++) { - if (!putVariable(&state[i], "startup", "scale", val)) + if (!putVariableInt(&state[i], "startup", "scale", scale)) exit(1); } } @@ -3416,12 +4083,11 @@ main(int argc, char **argv) * Define a :client_id variable that is unique per connection. But don't * override an explicit -D switch. */ - if (getVariable(&state[0], "client_id") == NULL) + if (lookupVariable(&state[0], "client_id") == NULL) { for (i = 0; i < nclients; i++) { - snprintf(val, sizeof(val), "%d", i); - if (!putVariable(&state[i], "startup", "client_id", val)) + if (!putVariableInt(&state[i], "startup", "client_id", i)) exit(1); } } @@ -3447,81 +4113,31 @@ main(int argc, char **argv) INSTR_TIME_SET_CURRENT(start_time); srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)); - /* process builtin SQL scripts */ - switch (ttype) - { - case 0: -#ifdef PGXC - if (use_branch) - sql_files[0] = process_builtin(tpc_b_bid, - "<builtin: TPC-B (sort of)>" ); - else -#endif - sql_files[0] = process_builtin(tpc_b, - "<builtin: TPC-B (sort of)>"); - num_files = 1; - break; - - case 1: - sql_files[0] = process_builtin(select_only, - "<builtin: select only>"); - num_files = 1; - break; - - case 2: -#ifdef PGXC - if (use_branch) - sql_files[0] = process_builtin(simple_update_bid, - "<builtin: simple update>"); - else -#endif - sql_files[0] = process_builtin(simple_update, - "<builtin: simple update>"); - num_files = 1; - break; - - default: - break; - } - /* set up thread data structures */ threads = (TState *) pg_malloc(sizeof(TState) * nthreads); + nclients_dealt = 0; + for (i = 0; i < nthreads; i++) { TState *thread = &threads[i]; thread->tid = i; - thread->state = &state[nclients / nthreads * i]; - thread->nstate = nclients / nthreads; + thread->state = &state[nclients_dealt]; + thread->nstate = + (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i); thread->random_state[0] = random(); thread->random_state[1] = random(); thread->random_state[2] = random(); - thread->throttle_latency_skipped = 0; + thread->logfile = NULL; /* filled in later */ thread->latency_late = 0; + initStats(&thread->stats, 0.0); - if (is_latencies) - { - /* Reserve memory for the thread to store per-command latencies */ - int t; - - thread->exec_elapsed = (instr_time *) - pg_malloc(sizeof(instr_time) * num_commands); - thread->exec_count = (int *) - pg_malloc(sizeof(int) * num_commands); - - for (t = 0; t < num_commands; t++) - { - INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]); - thread->exec_count[t] = 0; - } - } - else - { - thread->exec_elapsed = NULL; - thread->exec_count = NULL; - } + nclients_dealt += thread->nstate; } + /* all clients must be assigned to a thread */ + Assert(nclients_dealt == nclients); + /* get start up time */ INSTR_TIME_SET_CURRENT(start_time); @@ -3530,12 +4146,18 @@ main(int argc, char **argv) setalarm(duration); /* start threads */ +#ifdef ENABLE_THREAD_SAFETY for (i = 0; i < nthreads; i++) { TState *thread = &threads[i]; INSTR_TIME_SET_CURRENT(thread->start_time); + /* compute when to stop */ + if (duration > 0) + end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) + + (int64) 1000000 *duration; + /* the first thread (i = 0) is executed by main thread */ if (i > 0) { @@ -3543,7 +4165,7 @@ main(int argc, char **argv) if (err != 0 || thread->thread == INVALID_THREAD) { - fprintf(stderr, "cannot create thread: %s\n", strerror(err)); + fprintf(stderr, "could not create thread: %s\n", strerror(err)); exit(1); } } @@ -3552,33 +4174,40 @@ main(int argc, char **argv) thread->thread = INVALID_THREAD; } } +#else + INSTR_TIME_SET_CURRENT(threads[0].start_time); + /* compute when to stop */ + if (duration > 0) + end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) + + (int64) 1000000 *duration; + threads[0].thread = INVALID_THREAD; +#endif /* ENABLE_THREAD_SAFETY */ /* wait for threads and accumulate results */ + initStats(&stats, 0.0); INSTR_TIME_SET_ZERO(conn_total_time); for (i = 0; i < nthreads; i++) { - void *ret = NULL; + TState *thread = &threads[i]; +#ifdef ENABLE_THREAD_SAFETY if (threads[i].thread == INVALID_THREAD) - ret = threadRun(&threads[i]); + /* actually run this thread directly in the main thread */ + (void) threadRun(thread); else - pthread_join(threads[i].thread, &ret); - - if (ret != NULL) - { - TResult *r = (TResult *) ret; - - total_xacts += r->xacts; - total_latencies += r->latencies; - total_sqlats += r->sqlats; - throttle_lag += r->throttle_lag; - throttle_latency_skipped += r->throttle_latency_skipped; - latency_late += r->latency_late; - if (r->throttle_lag_max > throttle_lag_max) - throttle_lag_max = r->throttle_lag_max; - INSTR_TIME_ADD(conn_total_time, r->conn_time); - free(ret); - } + /* wait of other threads. should check that 0 is returned? */ + pthread_join(thread->thread, NULL); +#else + (void) threadRun(thread); +#endif /* ENABLE_THREAD_SAFETY */ + + /* aggregate thread level stats */ + mergeSimpleStats(&stats.latency, &thread->stats.latency); + mergeSimpleStats(&stats.lag, &thread->stats.lag); + stats.cnt += thread->stats.cnt; + stats.skipped += thread->stats.skipped; + latency_late += thread->latency_late; + INSTR_TIME_ADD(conn_total_time, thread->conn_time); } disconnect_all(state, nclients); @@ -3594,10 +4223,7 @@ main(int argc, char **argv) */ INSTR_TIME_SET_CURRENT(total_time); INSTR_TIME_SUBTRACT(total_time, start_time); - printResults(ttype, total_xacts, nclients, threads, nthreads, - total_time, conn_total_time, total_latencies, total_sqlats, - throttle_lag, throttle_lag_max, throttle_latency_skipped, - latency_late); + printResults(threads, &stats, total_time, conn_total_time, latency_late); return 0; } @@ -3607,8 +4233,6 @@ threadRun(void *arg) { TState *thread = (TState *) arg; CState *state = thread->state; - TResult *result; - FILE *logfile = NULL; /* per-thread log file */ instr_time start, end; int nstate = thread->nstate; @@ -3619,13 +4243,8 @@ threadRun(void *arg) int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time); int64 last_report = thread_start; int64 next_report = last_report + (int64) progress * 1000000; - int64 last_count = 0, - last_lats = 0, - last_sqlats = 0, - last_lags = 0, - last_skipped = 0; - - AggVals aggs; + StatsData last, + aggs; /* * Initialize throttling rate target for all of the thread's clients. It @@ -3635,12 +4254,8 @@ threadRun(void *arg) */ INSTR_TIME_SET_CURRENT(start); thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start); - thread->throttle_lag = 0; - thread->throttle_lag_max = 0; - - result = pg_malloc(sizeof(TResult)); - INSTR_TIME_SET_ZERO(result->conn_time); + INSTR_TIME_SET_ZERO(thread->conn_time); /* open log file if requested */ if (use_log) @@ -3651,11 +4266,12 @@ threadRun(void *arg) snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid); else snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, thread->tid); - logfile = fopen(logpath, "w"); + thread->logfile = fopen(logpath, "w"); - if (logfile == NULL) + if (thread->logfile == NULL) { - fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno)); + fprintf(stderr, "could not open logfile \"%s\": %s\n", + logpath, strerror(errno)); goto done; } } @@ -3671,25 +4287,31 @@ threadRun(void *arg) } /* time after thread and connections set up */ - INSTR_TIME_SET_CURRENT(result->conn_time); - INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time); + INSTR_TIME_SET_CURRENT(thread->conn_time); + INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time); - agg_vals_init(&aggs, thread->start_time); + initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time)); + last = aggs; /* send start up queries in async manner */ for (i = 0; i < nstate; i++) { CState *st = &state[i]; - Command **commands = sql_files[st->use_file]; int prev_ecnt = st->ecnt; + Command **commands; - st->use_file = getrand(thread, 0, num_files - 1); - if (!doCustom(thread, st, &result->conn_time, logfile, &aggs)) + st->use_file = chooseScript(thread); + commands = sql_script[st->use_file].commands; + if (debug) + fprintf(stderr, "client %d executing script \"%s\"\n", st->id, + sql_script[st->use_file].desc); + if (!doCustom(thread, st, &aggs)) remains--; /* I've aborted */ if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) { - fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state); + fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n", + i, st->state); remains--; /* I've aborted */ PQfinish(st->con); st->con = NULL; @@ -3710,7 +4332,7 @@ threadRun(void *arg) for (i = 0; i < nstate; i++) { CState *st = &state[i]; - Command **commands = sql_files[st->use_file]; + Command **commands = sql_script[st->use_file].commands; int sock; if (st->con == NULL) @@ -3723,7 +4345,7 @@ threadRun(void *arg) { /* interrupt client which has not started a transaction */ remains--; - st->sleeping = 0; + st->sleeping = false; st->throttling = false; PQfinish(st->con); st->con = NULL; @@ -3755,7 +4377,7 @@ threadRun(void *arg) sock = PQsocket(st->con); if (sock < 0) { - fprintf(stderr, "bad socket: %s\n", strerror(errno)); + fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con)); goto done; } @@ -3765,6 +4387,29 @@ threadRun(void *arg) maxsock = sock; } + /* also wake up to print the next progress report on time */ + if (progress && min_usec > 0 && thread->tid == 0) + { + /* get current time if needed */ + if (now_usec == 0) + { + instr_time now; + + INSTR_TIME_SET_CURRENT(now); + now_usec = INSTR_TIME_GET_MICROSEC(now); + } + + if (now_usec >= next_report) + min_usec = 0; + else if ((next_report - now_usec) < min_usec) + min_usec = next_report - now_usec; + } + + /* + * Sleep until we receive data from the server, or a nap-time + * specified in the script ends, or it's time to print a progress + * report. + */ if (min_usec > 0 && maxsock != -1) { int nsocks; /* return from select(2) */ @@ -3784,7 +4429,7 @@ threadRun(void *arg) if (errno == EINTR) continue; /* must be something wrong */ - fprintf(stderr, "select failed: %s\n", strerror(errno)); + fprintf(stderr, "select() failed: %s\n", strerror(errno)); goto done; } } @@ -3793,87 +4438,37 @@ threadRun(void *arg) for (i = 0; i < nstate; i++) { CState *st = &state[i]; - Command **commands = sql_files[st->use_file]; + Command **commands = sql_script[st->use_file].commands; int prev_ecnt = st->ecnt; - if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) - || commands[st->state]->type == META_COMMAND)) + if (st->con) { - if (!doCustom(thread, st, &result->conn_time, logfile, &aggs)) - remains--; /* I've aborted */ + int sock = PQsocket(st->con); + + if (sock < 0) + { + fprintf(stderr, "invalid socket: %s", + PQerrorMessage(st->con)); + goto done; + } + if (FD_ISSET(sock, &input_mask) || + commands[st->state]->type == META_COMMAND) + { + if (!doCustom(thread, st, &aggs)) + remains--; /* I've aborted */ + } } if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) { - fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state); + fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n", + i, st->state); remains--; /* I've aborted */ PQfinish(st->con); st->con = NULL; } } -#ifdef PTHREAD_FORK_EMULATION - /* each process reports its own progression */ - if (progress) - { - instr_time now_time; - int64 now; - - INSTR_TIME_SET_CURRENT(now_time); - now = INSTR_TIME_GET_MICROSEC(now_time); - if (now >= next_report) - { - /* generate and show report */ - int64 count = 0, - lats = 0, - sqlats = 0, - skipped = 0; - int64 lags = thread->throttle_lag; - int64 run = now - last_report; - double tps, - total_run, - latency, - sqlat, - stdev, - lag; - - for (i = 0; i < nstate; i++) - { - count += state[i].cnt; - lats += state[i].txn_latencies; - sqlats += state[i].txn_sqlats; - } - - total_run = (now - thread_start) / 1000000.0; - tps = 1000000.0 * (count - last_count) / run; - latency = 0.001 * (lats - last_lats) / (count - last_count); - sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count); - stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); - lag = 0.001 * (lags - last_lags) / (count - last_count); - skipped = thread->throttle_latency_skipped - last_skipped; - - fprintf(stderr, - "progress %d: %.1f s, %.1f tps, " - "lat %.3f ms stddev %.3f", - thread->tid, total_run, tps, latency, stdev); - if (throttle_delay) - { - fprintf(stderr, ", lag %.3f ms", lag); - if (latency_limit) - fprintf(stderr, ", skipped " INT64_FORMAT, skipped); - } - fprintf(stderr, "\n"); - - last_count = count; - last_lats = lats; - last_sqlats = sqlats; - last_lags = lags; - last_report = now; - last_skipped = thread->throttle_latency_skipped; - next_report += (int64) progress *1000000; - } - } -#else /* progress report by thread 0 for all threads */ if (progress && thread->tid == 0) { @@ -3885,11 +4480,7 @@ threadRun(void *arg) if (now >= next_report) { /* generate and show report */ - int64 count = 0, - lats = 0, - sqlats = 0, - lags = 0, - skipped = 0; + StatsData cur; int64 run = now - last_report; double tps, total_run, @@ -3897,71 +4488,87 @@ threadRun(void *arg) sqlat, lag, stdev; + char tbuf[64]; - for (i = 0; i < progress_nclients; i++) + /* + * Add up the statistics of all threads. + * + * XXX: No locking. There is no guarantee that we get an + * atomic snapshot of the transaction count and latencies, so + * these figures can well be off by a small amount. The + * progress is report's purpose is to give a quick overview of + * how the test is going, so that shouldn't matter too much. + * (If a read from a 64-bit integer is not atomic, you might + * get a "torn" read and completely bogus latencies though!) + */ + initStats(&cur, 0.0); + for (i = 0; i < nthreads; i++) { - count += state[i].cnt; - lats += state[i].txn_latencies; - sqlats += state[i].txn_sqlats; + mergeSimpleStats(&cur.latency, &thread[i].stats.latency); + mergeSimpleStats(&cur.lag, &thread[i].stats.lag); + cur.cnt += thread[i].stats.cnt; + cur.skipped += thread[i].stats.skipped; } - for (i = 0; i < progress_nthreads; i++) - lags += thread[i].throttle_lag; - total_run = (now - thread_start) / 1000000.0; - tps = 1000000.0 * (count - last_count) / run; - latency = 0.001 * (lats - last_lats) / (count - last_count); - sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count); + tps = 1000000.0 * (cur.cnt - last.cnt) / run; + latency = 0.001 * (cur.latency.sum - last.latency.sum) / + (cur.cnt - last.cnt); + sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2) + / (cur.cnt - last.cnt); stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency); - lag = 0.001 * (lags - last_lags) / (count - last_count); - skipped = thread->throttle_latency_skipped - last_skipped; + lag = 0.001 * (cur.lag.sum - last.lag.sum) / + (cur.cnt - last.cnt); + + if (progress_timestamp) + sprintf(tbuf, "%.03f s", + INSTR_TIME_GET_MILLISEC(now_time) / 1000.0); + else + sprintf(tbuf, "%.1f s", total_run); fprintf(stderr, - "progress: %.1f s, %.1f tps, " - "lat %.3f ms stddev %.3f", - total_run, tps, latency, stdev); + "progress: %s, %.1f tps, lat %.3f ms stddev %.3f", + tbuf, tps, latency, stdev); + if (throttle_delay) { fprintf(stderr, ", lag %.3f ms", lag); if (latency_limit) - fprintf(stderr, ", " INT64_FORMAT " skipped", skipped); + fprintf(stderr, ", " INT64_FORMAT " skipped", + cur.skipped - last.skipped); } fprintf(stderr, "\n"); - last_count = count; - last_lats = lats; - last_sqlats = sqlats; - last_lags = lags; + last = cur; last_report = now; - last_skipped = thread->throttle_latency_skipped; - next_report += (int64) progress *1000000; + + /* + * Ensure that the next report is in the future, in case + * pgbench/postgres got stuck somewhere. + */ + do + { + next_report += (int64) progress *1000000; + } while (now >= next_report); } } -#endif /* PTHREAD_FORK_EMULATION */ } done: INSTR_TIME_SET_CURRENT(start); disconnect_all(state, nstate); - result->xacts = 0; - result->latencies = 0; - result->sqlats = 0; - for (i = 0; i < nstate; i++) + INSTR_TIME_SET_CURRENT(end); + INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start); + if (thread->logfile) { - result->xacts += state[i].cnt; - result->latencies += state[i].txn_latencies; - result->sqlats += state[i].txn_sqlats; + if (agg_interval) + { + /* log aggregated but not yet reported transactions */ + doLog(thread, state, &end, &aggs, false, 0, 0); + } + fclose(thread->logfile); } - result->throttle_lag = thread->throttle_lag; - result->throttle_lag_max = thread->throttle_lag_max; - result->throttle_latency_skipped = thread->throttle_latency_skipped; - result->latency_late = thread->latency_late; - - INSTR_TIME_SET_CURRENT(end); - INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); - if (logfile) - fclose(logfile); - return result; + return NULL; } /* @@ -3983,90 +4590,6 @@ setalarm(int seconds) alarm(seconds); } -#ifndef ENABLE_THREAD_SAFETY - -/* - * implements pthread using fork. - */ - -typedef struct fork_pthread -{ - pid_t pid; - int pipes[2]; -} fork_pthread; - -static int -pthread_create(pthread_t *thread, - pthread_attr_t *attr, - void *(*start_routine) (void *), - void *arg) -{ - fork_pthread *th; - void *ret; - int rc; - - th = (fork_pthread *) pg_malloc(sizeof(fork_pthread)); - if (pipe(th->pipes) < 0) - { - free(th); - return errno; - } - - th->pid = fork(); - if (th->pid == -1) /* error */ - { - free(th); - return errno; - } - if (th->pid != 0) /* in parent process */ - { - close(th->pipes[1]); - *thread = th; - return 0; - } - - /* in child process */ - close(th->pipes[0]); - - /* set alarm again because the child does not inherit timers */ - if (duration > 0) - setalarm(duration); - - ret = start_routine(arg); - rc = write(th->pipes[1], ret, sizeof(TResult)); - (void) rc; - close(th->pipes[1]); - free(th); - exit(0); -} - -static int -pthread_join(pthread_t th, void **thread_return) -{ - int status; - - while (waitpid(th->pid, &status, 0) != th->pid) - { - if (errno != EINTR) - return errno; - } - - if (thread_return != NULL) - { - /* assume result is TResult */ - *thread_return = pg_malloc(sizeof(TResult)); - if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult)) - { - free(*thread_return); - *thread_return = NULL; - } - } - close(th->pipes[0]); - - free(th); - return 0; -} -#endif #else /* WIN32 */ static VOID CALLBACK @@ -4088,7 +4611,7 @@ setalarm(int seconds) win32_timer_callback, NULL, seconds * 1000, 0, WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE)) { - fprintf(stderr, "Failed to set timer\n"); + fprintf(stderr, "failed to set timer\n"); exit(1); } } diff --git a/src/bin/pgbench/pgbench.h b/src/bin/pgbench/pgbench.h index 42e2aae294..ab0f822010 100644 --- a/src/bin/pgbench/pgbench.h +++ b/src/bin/pgbench/pgbench.h @@ -2,7 +2,7 @@ * * pgbench.h * - * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * *------------------------------------------------------------------------- @@ -11,49 +11,129 @@ #ifndef PGBENCH_H #define PGBENCH_H +#include "fe_utils/psqlscan.h" + +/* + * This file is included outside exprscan.l, in places where we can't see + * flex's definition of typedef yyscan_t. Fortunately, it's documented as + * being "void *", so we can use a macro to keep the function declarations + * here looking like the definitions in exprscan.l. exprparse.y and + * pgbench.c also use this to be able to declare things as "yyscan_t". + */ +#define yyscan_t void * + +/* + * Likewise, we can't see exprparse.y's definition of union YYSTYPE here, + * but for now there's no need to know what the union contents are. + */ +union YYSTYPE; + +/* + * Variable types used in parser. + */ +typedef enum +{ + PGBT_INT, + PGBT_DOUBLE + /* add other types here */ +} PgBenchValueType; + +typedef struct +{ + PgBenchValueType type; + union + { + int64 ival; + double dval; + /* add other types here */ + } u; +} PgBenchValue; + +/* Types of expression nodes */ typedef enum PgBenchExprType { - ENODE_INTEGER_CONSTANT, + ENODE_CONSTANT, ENODE_VARIABLE, - ENODE_OPERATOR + ENODE_FUNCTION } PgBenchExprType; +/* List of operators and callable functions */ +typedef enum PgBenchFunction +{ + PGBENCH_ADD, + PGBENCH_SUB, + PGBENCH_MUL, + PGBENCH_DIV, + PGBENCH_MOD, + PGBENCH_DEBUG, + PGBENCH_ABS, + PGBENCH_LEAST, + PGBENCH_GREATEST, + PGBENCH_INT, + PGBENCH_DOUBLE, + PGBENCH_PI, + PGBENCH_SQRT, + PGBENCH_RANDOM, + PGBENCH_RANDOM_GAUSSIAN, + PGBENCH_RANDOM_EXPONENTIAL +} PgBenchFunction; + typedef struct PgBenchExpr PgBenchExpr; +typedef struct PgBenchExprLink PgBenchExprLink; +typedef struct PgBenchExprList PgBenchExprList; struct PgBenchExpr { PgBenchExprType etype; union { - struct - { - int64 ival; - } integer_constant; + PgBenchValue constant; struct { char *varname; } variable; struct { - char operator; - PgBenchExpr *lexpr; - PgBenchExpr *rexpr; - } operator; + PgBenchFunction function; + PgBenchExprLink *args; + } function; } u; }; +/* List of expression nodes */ +struct PgBenchExprLink +{ + PgBenchExpr *expr; + PgBenchExprLink *next; +}; + +struct PgBenchExprList +{ + PgBenchExprLink *head; + PgBenchExprLink *tail; +}; + extern PgBenchExpr *expr_parse_result; -extern int expr_yyparse(void); -extern int expr_yylex(void); -extern void expr_yyerror(const char *str); -extern void expr_scanner_init(const char *str, const char *source, - const int lineno, const char *line, - const char *cmd, const int ecol); -extern void syntax_error(const char *source, const int lineno, const char *line, - const char *cmd, const char *msg, const char *more, - const int col); -extern void expr_scanner_finish(void); +extern int expr_yyparse(yyscan_t yyscanner); +extern int expr_yylex(union YYSTYPE *lvalp, yyscan_t yyscanner); +extern void expr_yyerror(yyscan_t yyscanner, const char *str) pg_attribute_noreturn(); +extern void expr_yyerror_more(yyscan_t yyscanner, const char *str, + const char *more) pg_attribute_noreturn(); +extern bool expr_lex_one_word(PsqlScanState state, PQExpBuffer word_buf, + int *offset); +extern yyscan_t expr_scanner_init(PsqlScanState state, + const char *source, int lineno, int start_offset, + const char *command); +extern void expr_scanner_finish(yyscan_t yyscanner); +extern int expr_scanner_offset(PsqlScanState state); +extern char *expr_scanner_get_substring(PsqlScanState state, + int start_offset, int end_offset); +extern int expr_scanner_get_lineno(PsqlScanState state, int offset); + +extern void syntax_error(const char *source, int lineno, const char *line, + const char *cmd, const char *msg, + const char *more, int col) pg_attribute_noreturn(); extern int64 strtoint64(const char *str); diff --git a/src/bin/pgbench/t/001_pgbench.pl b/src/bin/pgbench/t/001_pgbench.pl new file mode 100644 index 0000000000..34d686ea86 --- /dev/null +++ b/src/bin/pgbench/t/001_pgbench.pl @@ -0,0 +1,25 @@ +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +# Test concurrent insertion into table with UNIQUE oid column. DDL expects +# GetNewOidWithIndex() to successfully avoid violating uniqueness for indexes +# like pg_class_oid_index and pg_proc_oid_index. This indirectly exercises +# LWLock and spinlock concurrency. This test makes a 5-MiB table. +my $node = get_new_node('main'); +$node->init; +$node->start; +$node->safe_psql('postgres', + 'CREATE UNLOGGED TABLE oid_tbl () WITH OIDS; ' + . 'ALTER TABLE oid_tbl ADD UNIQUE (oid);'); +my $script = $node->basedir . '/pgbench_script'; +append_to_file($script, + 'INSERT INTO oid_tbl SELECT FROM generate_series(1,1000);'); +$node->command_like( + [ qw(pgbench --no-vacuum --client=5 --protocol=prepared + --transactions=25 --file), $script ], + qr{processed: 125/125}, + 'concurrent OID generation'); |
