diff options
-rwxr-xr-x | configure | 2 | ||||
-rw-r--r-- | configure.in | 2 | ||||
-rw-r--r-- | src/bin/pgbench/pgbench.c | 289 | ||||
-rw-r--r-- | src/include/pg_config.h.in | 3 | ||||
-rw-r--r-- | src/include/pg_config.h.win32 | 3 | ||||
-rw-r--r-- | src/template/linux | 1 |
6 files changed, 258 insertions, 42 deletions
diff --git a/configure b/configure index 9b304023d3d..21ecd2989e6 100755 --- a/configure +++ b/configure @@ -15093,7 +15093,7 @@ fi LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` -for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l +for ac_func in cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l do : as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var" diff --git a/configure.in b/configure.in index 2e60a89502c..8fe6894829e 100644 --- a/configure.in +++ b/configure.in @@ -1562,7 +1562,7 @@ PGAC_FUNC_WCSTOMBS_L LIBS_including_readline="$LIBS" LIBS=`echo "$LIBS" | sed -e 's/-ledit//g' -e 's/-lreadline//g'` -AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l]) +AC_CHECK_FUNCS([cbrt clock_gettime fdatasync getifaddrs getpeerucred getrlimit mbstowcs_l memmove poll posix_fallocate ppoll pstat pthread_is_threaded_np readlink setproctitle setproctitle_fast setsid shm_open symlink sync_file_range utime utimes wcstombs_l]) AC_REPLACE_FUNCS(fseeko) case $host_os in diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 41b756c0894..ae81abab7f6 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -28,8 +28,8 @@ */ #ifdef WIN32 -#define FD_SETSIZE 1024 /* set before winsock2.h is included */ -#endif /* ! WIN32 */ +#define FD_SETSIZE 1024 /* must set before winsock2.h is included */ +#endif #include "postgres_fe.h" #include "fe_utils/conditional.h" @@ -45,12 +45,21 @@ #include <signal.h> #include <time.h> #include <sys/time.h> +#ifdef HAVE_SYS_RESOURCE_H +#include <sys/resource.h> /* for getrlimit */ +#endif + +/* For testing, PGBENCH_USE_SELECT can be defined to force use of that code */ +#if defined(HAVE_PPOLL) && !defined(PGBENCH_USE_SELECT) +#define POLL_USING_PPOLL +#ifdef HAVE_POLL_H +#include <poll.h> +#endif +#else /* no ppoll(), so use select() */ +#define POLL_USING_SELECT #ifdef HAVE_SYS_SELECT_H #include <sys/select.h> #endif - -#ifdef HAVE_SYS_RESOURCE_H -#include <sys/resource.h> /* for getrlimit */ #endif #ifndef M_PI @@ -71,6 +80,33 @@ #define MM2_ROT 47 /* + * Multi-platform socket set implementations + */ + +#ifdef POLL_USING_PPOLL +#define SOCKET_WAIT_METHOD "ppoll" + +typedef struct socket_set +{ + int maxfds; /* allocated length of pollfds[] array */ + int curfds; /* number currently in use */ + struct pollfd pollfds[FLEXIBLE_ARRAY_MEMBER]; +} socket_set; + +#endif /* POLL_USING_PPOLL */ + +#ifdef POLL_USING_SELECT +#define SOCKET_WAIT_METHOD "select" + +typedef struct socket_set +{ + int maxfd; /* largest FD currently set in fds */ + fd_set fds; +} socket_set; + +#endif /* POLL_USING_SELECT */ + +/* * Multi-platform pthread implementations */ @@ -93,13 +129,6 @@ static int pthread_join(pthread_t th, void **thread_return); /******************************************************************** * some configurable parameters */ -/* max number of clients allowed */ -#ifdef FD_SETSIZE -#define MAXCLIENTS (FD_SETSIZE - 10) -#else -#define MAXCLIENTS 1024 -#endif - #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */ #define LOG_STEP_SECONDS 5 /* seconds between log messages */ @@ -523,8 +552,14 @@ static void processXactStats(TState *thread, CState *st, instr_time *now, static void pgbench_error(const char *fmt,...) pg_attribute_printf(1, 2); static void addScript(ParsedScript script); static void *threadRun(void *arg); -static void setalarm(int seconds); static void finishCon(CState *st); +static void setalarm(int seconds); +static socket_set *alloc_socket_set(int count); +static void free_socket_set(socket_set *sa); +static void clear_socket_set(socket_set *sa); +static void add_socket_to_set(socket_set *sa, int fd, int idx); +static int wait_on_socket_set(socket_set *sa, int64 usecs); +static bool socket_has_input(socket_set *sa, int fd, int idx); /* callback functions for our flex lexer */ @@ -4903,7 +4938,7 @@ main(int argc, char **argv) case 'c': benchmarking_option_set = true; nclients = atoi(optarg); - if (nclients <= 0 || nclients > MAXCLIENTS) + if (nclients <= 0) { fprintf(stderr, "invalid number of clients: \"%s\"\n", optarg); @@ -5606,6 +5641,7 @@ threadRun(void *arg) end; int nstate = thread->nstate; int remains = nstate; /* number of remaining clients */ + socket_set *sockets = alloc_socket_set(nstate); int i; /* for reporting progress: */ @@ -5673,14 +5709,16 @@ threadRun(void *arg) /* loop till all clients have terminated */ while (remains > 0) { - fd_set input_mask; - int maxsock; /* max socket number to be waited for */ + int nsocks; /* number of sockets to be waited for */ int64 min_usec; int64 now_usec = 0; /* set this only if needed */ - /* identify which client sockets should be checked for input */ - FD_ZERO(&input_mask); - maxsock = -1; + /* + * identify which client sockets should be checked for input, and + * compute the nearest time (if any) at which we need to wake up. + */ + clear_socket_set(sockets); + nsocks = 0; min_usec = PG_INT64_MAX; for (i = 0; i < nstate; i++) { @@ -5728,9 +5766,7 @@ threadRun(void *arg) goto done; } - FD_SET(sock, &input_mask); - if (maxsock < sock) - maxsock = sock; + add_socket_to_set(sockets, sock, nsocks++); } else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED) @@ -5764,35 +5800,29 @@ threadRun(void *arg) /* * If no clients are ready to execute actions, sleep until we receive - * data from the server, or a nap-time specified in the script ends, - * or it's time to print a progress report. Update input_mask to show - * which client(s) received data. + * data on some client socket or the timeout (if any) elapses. */ if (min_usec > 0) { - int nsocks = 0; /* return from select(2) if called */ + int rc = 0; if (min_usec != PG_INT64_MAX) { - if (maxsock != -1) + if (nsocks > 0) { - struct timeval timeout; - - timeout.tv_sec = min_usec / 1000000; - timeout.tv_usec = min_usec % 1000000; - nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); + rc = wait_on_socket_set(sockets, min_usec); } else /* nothing active, simple sleep */ { pg_usleep(min_usec); } } - else /* no explicit delay, select without timeout */ + else /* no explicit delay, wait without timeout */ { - nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); + rc = wait_on_socket_set(sockets, 0); } - if (nsocks < 0) + if (rc < 0) { if (errno == EINTR) { @@ -5800,19 +5830,20 @@ threadRun(void *arg) continue; } /* must be something wrong */ - fprintf(stderr, "select() failed: %s\n", strerror(errno)); + fprintf(stderr, "%s() failed: %s\n", SOCKET_WAIT_METHOD, strerror(errno)); goto done; } } else { - /* min_usec == 0, i.e. something needs to be executed */ + /* min_usec <= 0, i.e. something needs to be executed now */ - /* If we didn't call select(), don't try to read any data */ - FD_ZERO(&input_mask); + /* If we didn't wait, don't try to read any data */ + clear_socket_set(sockets); } /* ok, advance the state machine of each connection */ + nsocks = 0; for (i = 0; i < nstate; i++) { CState *st = &state[i]; @@ -5829,7 +5860,7 @@ threadRun(void *arg) goto done; } - if (!FD_ISSET(sock, &input_mask)) + if (!socket_has_input(sockets, sock, nsocks++)) continue; } else if (st->state == CSTATE_FINISHED || @@ -5967,6 +5998,7 @@ done: fclose(thread->logfile); thread->logfile = NULL; } + free_socket_set(sockets); return NULL; } @@ -6025,8 +6057,185 @@ setalarm(int seconds) } } +#endif /* WIN32 */ + + +/* + * These functions provide an abstraction layer that hides the syscall + * we use to wait for input on a set of sockets. + * + * Currently there are two implementations, based on ppoll(2) and select(2). + * ppoll() is preferred where available due to its typically higher ceiling + * on the number of usable sockets. We do not use the more-widely-available + * poll(2) because it only offers millisecond timeout resolution, which could + * be problematic with high --rate settings. + * + * Function APIs: + * + * alloc_socket_set: allocate an empty socket set with room for up to + * "count" sockets. + * + * free_socket_set: deallocate a socket set. + * + * clear_socket_set: reset a socket set to empty. + * + * add_socket_to_set: add socket with indicated FD to slot "idx" in the + * socket set. Slots must be filled in order, starting with 0. + * + * wait_on_socket_set: wait for input on any socket in set, or for timeout + * to expire. timeout is measured in microseconds; 0 means wait forever. + * Returns result code of underlying syscall (>=0 if OK, else see errno). + * + * socket_has_input: after waiting, call this to see if given socket has + * input. fd and idx parameters should match some previous call to + * add_socket_to_set. + * + * Note that wait_on_socket_set destructively modifies the state of the + * socket set. After checking for input, caller must apply clear_socket_set + * and add_socket_to_set again before waiting again. + */ + +#ifdef POLL_USING_PPOLL + +static socket_set * +alloc_socket_set(int count) +{ + socket_set *sa; + + sa = (socket_set *) pg_malloc0(offsetof(socket_set, pollfds) + + sizeof(struct pollfd) * count); + sa->maxfds = count; + sa->curfds = 0; + return sa; +} + +static void +free_socket_set(socket_set *sa) +{ + pg_free(sa); +} + +static void +clear_socket_set(socket_set *sa) +{ + sa->curfds = 0; +} + +static void +add_socket_to_set(socket_set *sa, int fd, int idx) +{ + Assert(idx < sa->maxfds && idx == sa->curfds); + sa->pollfds[idx].fd = fd; + sa->pollfds[idx].events = POLLIN; + sa->pollfds[idx].revents = 0; + sa->curfds++; +} + +static int +wait_on_socket_set(socket_set *sa, int64 usecs) +{ + if (usecs > 0) + { + struct timespec timeout; + + timeout.tv_sec = usecs / 1000000; + timeout.tv_nsec = (usecs % 1000000) * 1000; + return ppoll(sa->pollfds, sa->curfds, &timeout, NULL); + } + else + { + return ppoll(sa->pollfds, sa->curfds, NULL, NULL); + } +} + +static bool +socket_has_input(socket_set *sa, int fd, int idx) +{ + /* + * In some cases, threadRun will apply clear_socket_set and then try to + * apply socket_has_input anyway with arguments that it used before that, + * or might've used before that except that it exited its setup loop + * early. Hence, if the socket set is empty, silently return false + * regardless of the parameters. If it's not empty, we can Assert that + * the parameters match a previous call. + */ + if (sa->curfds == 0) + return false; + + Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd); + return (sa->pollfds[idx].revents & POLLIN) != 0; +} + +#endif /* POLL_USING_PPOLL */ + +#ifdef POLL_USING_SELECT + +static socket_set * +alloc_socket_set(int count) +{ + return (socket_set *) pg_malloc0(sizeof(socket_set)); +} + +static void +free_socket_set(socket_set *sa) +{ + pg_free(sa); +} + +static void +clear_socket_set(socket_set *sa) +{ + FD_ZERO(&sa->fds); + sa->maxfd = -1; +} + +static void +add_socket_to_set(socket_set *sa, int fd, int idx) +{ + if (fd < 0 || fd >= FD_SETSIZE) + { + /* + * Doing a hard exit here is a bit grotty, but it doesn't seem worth + * complicating the API to make it less grotty. + */ + fprintf(stderr, "too many client connections for select()\n"); + exit(1); + } + FD_SET(fd, &sa->fds); + if (fd > sa->maxfd) + sa->maxfd = fd; +} + +static int +wait_on_socket_set(socket_set *sa, int64 usecs) +{ + if (usecs > 0) + { + struct timeval timeout; + + timeout.tv_sec = usecs / 1000000; + timeout.tv_usec = usecs % 1000000; + return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout); + } + else + { + return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL); + } +} + +static bool +socket_has_input(socket_set *sa, int fd, int idx) +{ + return (FD_ISSET(fd, &sa->fds) != 0); +} + +#endif /* POLL_USING_SELECT */ + + /* partial pthread implementation for Windows */ +#ifdef WIN32 + typedef struct win32_pthread { HANDLE handle; diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in index 4094e22776c..5d4079609cc 100644 --- a/src/include/pg_config.h.in +++ b/src/include/pg_config.h.in @@ -443,6 +443,9 @@ /* Define to 1 if the assembler supports PPC's LWARX mutex hint bit. */ #undef HAVE_PPC_LWARX_MUTEX_HINT +/* Define to 1 if you have the `ppoll' function. */ +#undef HAVE_PPOLL + /* Define to 1 if you have the `pstat' function. */ #undef HAVE_PSTAT diff --git a/src/include/pg_config.h.win32 b/src/include/pg_config.h.win32 index 6618b435874..182698a0872 100644 --- a/src/include/pg_config.h.win32 +++ b/src/include/pg_config.h.win32 @@ -327,6 +327,9 @@ /* Define to 1 if you have the `posix_fallocate' function. */ /* #undef HAVE_POSIX_FALLOCATE */ +/* Define to 1 if you have the `ppoll' function. */ +/* #undef HAVE_PPOLL */ + /* Define to 1 if you have the `pstat' function. */ /* #undef HAVE_PSTAT */ diff --git a/src/template/linux b/src/template/linux index f820bf7280f..e39290845ad 100644 --- a/src/template/linux +++ b/src/template/linux @@ -6,6 +6,7 @@ if test x"$PREFERRED_SEMAPHORES" = x"" ; then fi # Force _GNU_SOURCE on; plperl is broken with Perl 5.8.0 otherwise +# This is also required for ppoll(2), and perhaps other things CPPFLAGS="$CPPFLAGS -D_GNU_SOURCE" # If --enable-profiling is specified, we need -DLINUX_PROFILE |