diff options
| author | Marko Kreen | 2007-12-20 22:13:45 +0000 |
|---|---|---|
| committer | Marko Kreen | 2007-12-20 22:13:45 +0000 |
| commit | 777f778ec18675f0d025dcf427bfad526fad60d9 (patch) | |
| tree | a529deacde0f8486176fa05d6c0aeb7a3c014830 /src/execute.c | |
| parent | ae5ec4d4a5e5b4abce608f34057839223e4664c7 (diff) | |
replace select() with poll() to allow large fd values
Diffstat (limited to 'src/execute.c')
| -rw-r--r-- | src/execute.c | 109 |
1 files changed, 48 insertions, 61 deletions
diff --git a/src/execute.c b/src/execute.c index 02361c1..0e75c6a 100644 --- a/src/execute.c +++ b/src/execute.c @@ -28,7 +28,8 @@ #include "plproxy.h" #include <sys/time.h> -#include <sys/select.h> + +#include "poll_compat.h" /* some error happened */ static void @@ -198,22 +199,12 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now) { time_t t; int res; - fd_set fds; - int fd; - struct timeval notimeout = {0, 0}; + struct pollfd pfd; ProxyConfig *cf = &func->cur_cluster->config; if (PQstatus(conn->db) != CONNECTION_OK) return false; - fd = PQsocket(conn->db); - if (fd < 0) - { - elog(WARNING, "libpq socket lost: fd=%d, err=%s", - fd, PQerrorMessage(conn->db)); - return false; - } - /* check if too old */ if (cf->connection_lifetime > 0) { @@ -232,16 +223,11 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now) * are events pending. If there are drop the connection. */ intr_loop: - /* just in case detect if too many fds */ -#ifdef FD_SETSIZE - if (fd >= FD_SETSIZE) - plproxy_error(func, "Sorry, fd_set to select() too big: FD_SETSIZE=%d, fd=%d", - FD_SETSIZE, fd); -#endif - - FD_ZERO(&fds); - FD_SET(fd, &fds); - res = select(fd + 1, &fds, NULL, NULL, ¬imeout); + pfd.fd = PQsocket(conn->db); + pfd.events = POLLIN; + pfd.revents = 0; + + res = poll(&pfd, 1, 0); if (res > 0) { elog(WARNING, "PL/Proxy: detected unstable connection"); @@ -409,18 +395,32 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn) static int poll_conns(ProxyFunction *func, ProxyCluster *cluster) { + static struct pollfd *pfd_cache = NULL; + static int pfd_allocated = 0; + int i, res, - fd, - fd_max = 0; - fd_set read_fds; - fd_set write_fds; - fd_set *cur_set = NULL; - struct timeval timeout; + fd; ProxyConnection *conn; + struct pollfd *pf; + int numfds = 0; + int ev = 0; - FD_ZERO(&read_fds); - FD_ZERO(&write_fds); + if (pfd_allocated < cluster->conn_count) + { + struct pollfd *tmp; + int num = cluster->conn_count; + if (num < 64) + num = 64; + if (pfd_cache == NULL) + tmp = malloc(num * sizeof(struct pollfd)); + else + tmp = realloc(pfd_cache, num * sizeof(struct pollfd)); + if (!tmp) + elog(ERROR, "no mem for pollfd cache"); + pfd_cache = tmp; + pfd_allocated = num; + } for (i = 0; i < cluster->conn_count; i++) { @@ -437,54 +437,40 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) continue; case C_CONNECT_READ: case C_QUERY_READ: - cur_set = &read_fds; + ev = POLLIN; break; case C_CONNECT_WRITE: case C_QUERY_WRITE: - cur_set = &write_fds; + ev = POLLOUT; break; } /* add fd to proper set */ - fd = PQsocket(conn->db); - if (fd > fd_max) - fd_max = fd; - else if (fd < 0) - plproxy_error(func, "libpq has lost its socket: fd=%d err=%s", - fd, PQerrorMessage(conn->db)); - FD_SET(fd, cur_set); + pf = pfd_cache + numfds++; + pf->fd = PQsocket(conn->db); + pf->events = ev; + pf->revents = 0; } - /* set timeout */ - timeout.tv_sec = 1; - timeout.tv_usec = 0; - - /* just in case detect if too many fds */ -#ifdef FD_SETSIZE - if (fd_max > FD_SETSIZE) - plproxy_error(func, "Sorry, fd_set to select() too big: FD_SETSIZE=%d, fd_max=%d", - FD_SETSIZE, fd_max); -#endif - /* wait for events */ - res = select(fd_max + 1, &read_fds, &write_fds, NULL, &timeout); + res = poll(pfd_cache, numfds, 1000); if (res == 0) return 0; if (res < 0) { if (errno == EINTR) return 0; - plproxy_error(func, "select() failed: %s", strerror(errno)); + plproxy_error(func, "poll() failed: %s", strerror(errno)); } /* now recheck the conns */ + pf = pfd_cache; for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; if (!conn->run_on) continue; - /* look in which set it should be */ switch (conn->state) { case C_DONE: @@ -493,21 +479,22 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) continue; case C_CONNECT_READ: case C_QUERY_READ: - cur_set = &read_fds; - break; case C_CONNECT_WRITE: case C_QUERY_WRITE: - cur_set = &write_fds; break; } - /* check */ + /* + * they should be in same order as called, + */ fd = PQsocket(conn->db); - if (fd < 0) - elog(WARNING, "libpq dropped socket: fd=%d err=%s", - fd, PQerrorMessage(conn->db)); - else if (FD_ISSET(fd, cur_set)) + if (pf->fd != fd) + elog(WARNING, "fd order from poll() is messed up?"); + + if (pf->revents) handle_conn(func, conn); + + pf++; } return 1; } |
