*/
#ifdef WIN32
-#define FD_SETSIZE 1024 /* set before winsock2.h is included */
-#endif /* ! WIN32 */
+#define FD_SETSIZE 1024 /* must set before winsock2.h is included */
+#endif
#include "postgres_fe.h"
#include "fe_utils/conditional.h"
#include <signal.h>
#include <time.h>
#include <sys/time.h>
+#ifdef HAVE_SYS_RESOURCE_H
+#include <sys/resource.h> /* for getrlimit */
+#endif
+
+/* For testing, PGBENCH_USE_SELECT can be defined to force use of that code */
+#if defined(HAVE_PPOLL) && !defined(PGBENCH_USE_SELECT)
+#define POLL_USING_PPOLL
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#else /* no ppoll(), so use select() */
+#define POLL_USING_SELECT
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif
-
-#ifdef HAVE_SYS_RESOURCE_H
-#include <sys/resource.h> /* for getrlimit */
#endif
#ifndef M_PI
#define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8)
#define MM2_ROT 47
+/*
+ * Multi-platform socket set implementations
+ */
+
+#ifdef POLL_USING_PPOLL
+#define SOCKET_WAIT_METHOD "ppoll"
+
+typedef struct socket_set
+{
+ int maxfds; /* allocated length of pollfds[] array */
+ int curfds; /* number currently in use */
+ struct pollfd pollfds[FLEXIBLE_ARRAY_MEMBER];
+} socket_set;
+
+#endif /* POLL_USING_PPOLL */
+
+#ifdef POLL_USING_SELECT
+#define SOCKET_WAIT_METHOD "select"
+
+typedef struct socket_set
+{
+ int maxfd; /* largest FD currently set in fds */
+ fd_set fds;
+} socket_set;
+
+#endif /* POLL_USING_SELECT */
+
/*
* Multi-platform pthread implementations
*/
/********************************************************************
* some configurable parameters */
-/* max number of clients allowed */
-#ifdef FD_SETSIZE
-#define MAXCLIENTS (FD_SETSIZE - 10)
-#else
-#define MAXCLIENTS 1024
-#endif
-
#define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
#define LOG_STEP_SECONDS 5 /* seconds between log messages */
static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2);
static void addScript(ParsedScript script);
static void *threadRun(void *arg);
-static void setalarm(int seconds);
static void finishCon(CState *st);
+static void setalarm(int seconds);
+static socket_set *alloc_socket_set(int count);
+static void free_socket_set(socket_set *sa);
+static void clear_socket_set(socket_set *sa);
+static void add_socket_to_set(socket_set *sa, int fd, int idx);
+static int wait_on_socket_set(socket_set *sa, int64 usecs);
+static bool socket_has_input(socket_set *sa, int fd, int idx);
/* callback functions for our flex lexer */
case 'c':
benchmarking_option_set = true;
nclients = atoi(optarg);
- if (nclients <= 0 || nclients > MAXCLIENTS)
+ if (nclients <= 0)
{
fprintf(stderr, "invalid number of clients: \"%s\"\n",
optarg);
end;
int nstate = thread->nstate;
int remains = nstate; /* number of remaining clients */
+ socket_set *sockets = alloc_socket_set(nstate);
int i;
/* for reporting progress: */
/* loop till all clients have terminated */
while (remains > 0)
{
- fd_set input_mask;
- int maxsock; /* max socket number to be waited for */
+ int nsocks; /* number of sockets to be waited for */
int64 min_usec;
int64 now_usec = 0; /* set this only if needed */
- /* identify which client sockets should be checked for input */
- FD_ZERO(&input_mask);
- maxsock = -1;
+ /*
+ * identify which client sockets should be checked for input, and
+ * compute the nearest time (if any) at which we need to wake up.
+ */
+ clear_socket_set(sockets);
+ nsocks = 0;
min_usec = PG_INT64_MAX;
for (i = 0; i < nstate; i++)
{
goto done;
}
- FD_SET(sock, &input_mask);
- if (maxsock < sock)
- maxsock = sock;
+ add_socket_to_set(sockets, sock, nsocks++);
}
else if (st->state != CSTATE_ABORTED &&
st->state != CSTATE_FINISHED)
/*
* If no clients are ready to execute actions, sleep until we receive
- * data from the server, or a nap-time specified in the script ends,
- * or it's time to print a progress report. Update input_mask to show
- * which client(s) received data.
+ * data on some client socket or the timeout (if any) elapses.
*/
if (min_usec > 0)
{
- int nsocks = 0; /* return from select(2) if called */
+ int rc = 0;
if (min_usec != PG_INT64_MAX)
{
- if (maxsock != -1)
+ if (nsocks > 0)
{
- struct timeval timeout;
-
- timeout.tv_sec = min_usec / 1000000;
- timeout.tv_usec = min_usec % 1000000;
- nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout);
+ rc = wait_on_socket_set(sockets, min_usec);
}
else /* nothing active, simple sleep */
{
pg_usleep(min_usec);
}
}
- else /* no explicit delay, select without timeout */
+ else /* no explicit delay, wait without timeout */
{
- nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL);
+ rc = wait_on_socket_set(sockets, 0);
}
- if (nsocks < 0)
+ if (rc < 0)
{
if (errno == EINTR)
{
continue;
}
/* must be something wrong */
- fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ fprintf(stderr, "%s() failed: %s\n", SOCKET_WAIT_METHOD, strerror(errno));
goto done;
}
}
else
{
- /* min_usec == 0, i.e. something needs to be executed */
+ /* min_usec <= 0, i.e. something needs to be executed now */
- /* If we didn't call select(), don't try to read any data */
- FD_ZERO(&input_mask);
+ /* If we didn't wait, don't try to read any data */
+ clear_socket_set(sockets);
}
/* ok, advance the state machine of each connection */
+ nsocks = 0;
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
goto done;
}
- if (!FD_ISSET(sock, &input_mask))
+ if (!socket_has_input(sockets, sock, nsocks++))
continue;
}
else if (st->state == CSTATE_FINISHED ||
fclose(thread->logfile);
thread->logfile = NULL;
}
+ free_socket_set(sockets);
return NULL;
}
}
}
+#endif /* WIN32 */
+
+
+/*
+ * These functions provide an abstraction layer that hides the syscall
+ * we use to wait for input on a set of sockets.
+ *
+ * Currently there are two implementations, based on ppoll(2) and select(2).
+ * ppoll() is preferred where available due to its typically higher ceiling
+ * on the number of usable sockets. We do not use the more-widely-available
+ * poll(2) because it only offers millisecond timeout resolution, which could
+ * be problematic with high --rate settings.
+ *
+ * Function APIs:
+ *
+ * alloc_socket_set: allocate an empty socket set with room for up to
+ * "count" sockets.
+ *
+ * free_socket_set: deallocate a socket set.
+ *
+ * clear_socket_set: reset a socket set to empty.
+ *
+ * add_socket_to_set: add socket with indicated FD to slot "idx" in the
+ * socket set. Slots must be filled in order, starting with 0.
+ *
+ * wait_on_socket_set: wait for input on any socket in set, or for timeout
+ * to expire. timeout is measured in microseconds; 0 means wait forever.
+ * Returns result code of underlying syscall (>=0 if OK, else see errno).
+ *
+ * socket_has_input: after waiting, call this to see if given socket has
+ * input. fd and idx parameters should match some previous call to
+ * add_socket_to_set.
+ *
+ * Note that wait_on_socket_set destructively modifies the state of the
+ * socket set. After checking for input, caller must apply clear_socket_set
+ * and add_socket_to_set again before waiting again.
+ */
+
+#ifdef POLL_USING_PPOLL
+
+static socket_set *
+alloc_socket_set(int count)
+{
+ socket_set *sa;
+
+ sa = (socket_set *) pg_malloc0(offsetof(socket_set, pollfds) +
+ sizeof(struct pollfd) * count);
+ sa->maxfds = count;
+ sa->curfds = 0;
+ return sa;
+}
+
+static void
+free_socket_set(socket_set *sa)
+{
+ pg_free(sa);
+}
+
+static void
+clear_socket_set(socket_set *sa)
+{
+ sa->curfds = 0;
+}
+
+static void
+add_socket_to_set(socket_set *sa, int fd, int idx)
+{
+ Assert(idx < sa->maxfds && idx == sa->curfds);
+ sa->pollfds[idx].fd = fd;
+ sa->pollfds[idx].events = POLLIN;
+ sa->pollfds[idx].revents = 0;
+ sa->curfds++;
+}
+
+static int
+wait_on_socket_set(socket_set *sa, int64 usecs)
+{
+ if (usecs > 0)
+ {
+ struct timespec timeout;
+
+ timeout.tv_sec = usecs / 1000000;
+ timeout.tv_nsec = (usecs % 1000000) * 1000;
+ return ppoll(sa->pollfds, sa->curfds, &timeout, NULL);
+ }
+ else
+ {
+ return ppoll(sa->pollfds, sa->curfds, NULL, NULL);
+ }
+}
+
+static bool
+socket_has_input(socket_set *sa, int fd, int idx)
+{
+ /*
+ * In some cases, threadRun will apply clear_socket_set and then try to
+ * apply socket_has_input anyway with arguments that it used before that,
+ * or might've used before that except that it exited its setup loop
+ * early. Hence, if the socket set is empty, silently return false
+ * regardless of the parameters. If it's not empty, we can Assert that
+ * the parameters match a previous call.
+ */
+ if (sa->curfds == 0)
+ return false;
+
+ Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
+ return (sa->pollfds[idx].revents & POLLIN) != 0;
+}
+
+#endif /* POLL_USING_PPOLL */
+
+#ifdef POLL_USING_SELECT
+
+static socket_set *
+alloc_socket_set(int count)
+{
+ return (socket_set *) pg_malloc0(sizeof(socket_set));
+}
+
+static void
+free_socket_set(socket_set *sa)
+{
+ pg_free(sa);
+}
+
+static void
+clear_socket_set(socket_set *sa)
+{
+ FD_ZERO(&sa->fds);
+ sa->maxfd = -1;
+}
+
+static void
+add_socket_to_set(socket_set *sa, int fd, int idx)
+{
+ if (fd < 0 || fd >= FD_SETSIZE)
+ {
+ /*
+ * Doing a hard exit here is a bit grotty, but it doesn't seem worth
+ * complicating the API to make it less grotty.
+ */
+ fprintf(stderr, "too many client connections for select()\n");
+ exit(1);
+ }
+ FD_SET(fd, &sa->fds);
+ if (fd > sa->maxfd)
+ sa->maxfd = fd;
+}
+
+static int
+wait_on_socket_set(socket_set *sa, int64 usecs)
+{
+ if (usecs > 0)
+ {
+ struct timeval timeout;
+
+ timeout.tv_sec = usecs / 1000000;
+ timeout.tv_usec = usecs % 1000000;
+ return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
+ }
+ else
+ {
+ return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
+ }
+}
+
+static bool
+socket_has_input(socket_set *sa, int fd, int idx)
+{
+ return (FD_ISSET(fd, &sa->fds) != 0);
+}
+
+#endif /* POLL_USING_SELECT */
+
+
/* partial pthread implementation for Windows */
+#ifdef WIN32
+
typedef struct win32_pthread
{
HANDLE handle;