static int wait_on_socket_set(socket_set *sa, int64 usecs);
static bool socket_has_input(socket_set *sa, int fd, int idx);
+/* callback used to build rows for COPY during data loading */
+typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
/* callback functions for our flex lexer */
static const PsqlScanCallbacks pgbench_callbacks = {
"pgbench_tellers");
}
-/*
- * Fill the standard tables with some data generated and sent from the client
- */
static void
-initGenerateDataClientSide(PGconn *con)
+initBranch(PQExpBufferData *sql, int64 curr)
{
- PQExpBufferData sql;
+ /* "filler" column uses NULL */
+ printfPQExpBuffer(sql,
+ INT64_FORMAT "\t0\t\\N\n",
+ curr + 1);
+}
+
+static void
+initTeller(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column uses NULL */
+ printfPQExpBuffer(sql,
+ INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n",
+ curr + 1, curr / ntellers + 1);
+}
+
+static void
+initAccount(PQExpBufferData *sql, int64 curr)
+{
+ /* "filler" column defaults to blank padded empty string */
+ printfPQExpBuffer(sql,
+ INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
+ curr + 1, curr / naccounts + 1);
+}
+
+static void
+initPopulateTable(PGconn *con, const char *table, int64 base,
+ initRowMethod init_row)
+{
+ int n;
+ int k;
+ int chars = 0;
PGresult *res;
- int i;
- int64 k;
- char *copy_statement;
+ PQExpBufferData sql;
+ char copy_statement[256];
+ const char *copy_statement_fmt = "copy %s from stdin";
+ int64 total = base * scale;
/* used to track elapsed time and estimate of the remaining time */
pg_time_usec_t start;
/* Stay on the same line if reporting to a terminal */
char eol = isatty(fileno(stderr)) ? '\r' : '\n';
- fprintf(stderr, "generating data (client-side)...\n");
-
- /*
- * we do all of this in one transaction to enable the backend's
- * data-loading optimizations
- */
- executeStatement(con, "begin");
-
- /* truncate away any old data */
- initTruncateTables(con);
-
initPQExpBuffer(&sql);
/*
- * fill branches, tellers, accounts in that order in case foreign keys
- * already exist
+ * Use COPY with FREEZE on v14 and later for all the tables except
+ * pgbench_accounts when it is partitioned.
*/
- for (i = 0; i < nbranches * scale; i++)
+ if (PQserverVersion(con) >= 140000)
{
- /* "filler" column defaults to NULL */
- printfPQExpBuffer(&sql,
- "insert into pgbench_branches(bid,bbalance) values(%d,0)",
- i + 1);
- executeStatement(con, sql.data);
+ if (strcmp(table, "pgbench_accounts") != 0 ||
+ partitions == 0)
+ copy_statement_fmt = "copy %s from stdin with (freeze on)";
}
- for (i = 0; i < ntellers * scale; i++)
- {
- /* "filler" column defaults to NULL */
- printfPQExpBuffer(&sql,
- "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
- i + 1, i / ntellers + 1);
- executeStatement(con, sql.data);
- }
-
- /*
- * accounts is big enough to be worth using COPY and tracking runtime
- */
-
- /* use COPY with FREEZE on v14 and later without partitioning */
- if (partitions == 0 && PQserverVersion(con) >= 140000)
- copy_statement = "copy pgbench_accounts from stdin with (freeze on)";
- else
- copy_statement = "copy pgbench_accounts from stdin";
+ n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table);
+ if (n >= sizeof(copy_statement))
+ pg_fatal("invalid buffer size: must be at least %d characters long", n);
+ else if (n == -1)
+ pg_fatal("invalid format string");
res = PQexec(con, copy_statement);
start = pg_time_now();
- for (k = 0; k < (int64) naccounts * scale; k++)
+ for (k = 0; k < total; k++)
{
int64 j = k + 1;
- /* "filler" column defaults to blank padded empty string */
- printfPQExpBuffer(&sql,
- INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
- j, k / naccounts + 1);
+ init_row(&sql, k);
if (PQputline(con, sql.data))
pg_fatal("PQputline failed");
if ((!use_quiet) && (j % 100000 == 0))
{
double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
- double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+ double remaining_sec = ((double) total - j) * elapsed_sec / j;
- fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
- j, (int64) naccounts * scale,
- (int) (((int64) j * 100) / (naccounts * (int64) scale)),
- elapsed_sec, remaining_sec, eol);
+ chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)%c",
+ j, total,
+ (int) ((j * 100) / total),
+ table, elapsed_sec, remaining_sec, eol);
}
/* let's not call the timing for each row, but only each 100 rows */
else if (use_quiet && (j % 100 == 0))
{
double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
- double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+ double remaining_sec = ((double) total - j) * elapsed_sec / j;
/* have we reached the next interval (or end)? */
- if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
+ if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
{
- fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
- j, (int64) naccounts * scale,
- (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec, eol);
+ chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)%c",
+ j, total,
+ (int) ((j * 100) / total),
+ table, elapsed_sec, remaining_sec, eol);
/* skip to the next interval */
log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
}
}
- if (eol != '\n')
- fputc('\n', stderr); /* Need to move to next line */
+ if (chars != 0 && eol != '\n')
+ fprintf(stderr, "%*c\r", chars - 1, ' '); /* Clear the current line */
if (PQputline(con, "\\.\n"))
pg_fatal("very last PQputline failed");
pg_fatal("PQendcopy failed");
termPQExpBuffer(&sql);
+}
+
+/*
+ * Fill the standard tables with some data generated and sent from the client.
+ *
+ * The filler column is NULL in pgbench_branches and pgbench_tellers, and is
+ * a blank-padded string in pgbench_accounts.
+ */
+static void
+initGenerateDataClientSide(PGconn *con)
+{
+ fprintf(stderr, "generating data (client-side)...\n");
+
+ /*
+ * we do all of this in one transaction to enable the backend's
+ * data-loading optimizations
+ */
+ executeStatement(con, "begin");
+
+ /* truncate away any old data */
+ initTruncateTables(con);
+
+ /*
+ * fill branches, tellers, accounts in that order in case foreign keys
+ * already exist
+ */
+ initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
+ initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
+ initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
executeStatement(con, "commit");
}