summaryrefslogtreecommitdiff
path: root/src/execute.c
diff options
context:
space:
mode:
authorMarko Kreen2007-12-20 22:13:45 +0000
committerMarko Kreen2007-12-20 22:13:45 +0000
commit777f778ec18675f0d025dcf427bfad526fad60d9 (patch)
treea529deacde0f8486176fa05d6c0aeb7a3c014830 /src/execute.c
parentae5ec4d4a5e5b4abce608f34057839223e4664c7 (diff)
replace select() with poll() to allow large fd values
Diffstat (limited to 'src/execute.c')
-rw-r--r--src/execute.c109
1 files changed, 48 insertions, 61 deletions
diff --git a/src/execute.c b/src/execute.c
index 02361c1..0e75c6a 100644
--- a/src/execute.c
+++ b/src/execute.c
@@ -28,7 +28,8 @@
#include "plproxy.h"
#include <sys/time.h>
-#include <sys/select.h>
+
+#include "poll_compat.h"
/* some error happened */
static void
@@ -198,22 +199,12 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
{
time_t t;
int res;
- fd_set fds;
- int fd;
- struct timeval notimeout = {0, 0};
+ struct pollfd pfd;
ProxyConfig *cf = &func->cur_cluster->config;
if (PQstatus(conn->db) != CONNECTION_OK)
return false;
- fd = PQsocket(conn->db);
- if (fd < 0)
- {
- elog(WARNING, "libpq socket lost: fd=%d, err=%s",
- fd, PQerrorMessage(conn->db));
- return false;
- }
-
/* check if too old */
if (cf->connection_lifetime > 0)
{
@@ -232,16 +223,11 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
* are events pending. If there are drop the connection.
*/
intr_loop:
- /* just in case detect if too many fds */
-#ifdef FD_SETSIZE
- if (fd >= FD_SETSIZE)
- plproxy_error(func, "Sorry, fd_set to select() too big: FD_SETSIZE=%d, fd=%d",
- FD_SETSIZE, fd);
-#endif
-
- FD_ZERO(&fds);
- FD_SET(fd, &fds);
- res = select(fd + 1, &fds, NULL, NULL, &notimeout);
+ pfd.fd = PQsocket(conn->db);
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ res = poll(&pfd, 1, 0);
if (res > 0)
{
elog(WARNING, "PL/Proxy: detected unstable connection");
@@ -409,18 +395,32 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn)
static int
poll_conns(ProxyFunction *func, ProxyCluster *cluster)
{
+ static struct pollfd *pfd_cache = NULL;
+ static int pfd_allocated = 0;
+
int i,
res,
- fd,
- fd_max = 0;
- fd_set read_fds;
- fd_set write_fds;
- fd_set *cur_set = NULL;
- struct timeval timeout;
+ fd;
ProxyConnection *conn;
+ struct pollfd *pf;
+ int numfds = 0;
+ int ev = 0;
- FD_ZERO(&read_fds);
- FD_ZERO(&write_fds);
+ if (pfd_allocated < cluster->conn_count)
+ {
+ struct pollfd *tmp;
+ int num = cluster->conn_count;
+ if (num < 64)
+ num = 64;
+ if (pfd_cache == NULL)
+ tmp = malloc(num * sizeof(struct pollfd));
+ else
+ tmp = realloc(pfd_cache, num * sizeof(struct pollfd));
+ if (!tmp)
+ elog(ERROR, "no mem for pollfd cache");
+ pfd_cache = tmp;
+ pfd_allocated = num;
+ }
for (i = 0; i < cluster->conn_count; i++)
{
@@ -437,54 +437,40 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
continue;
case C_CONNECT_READ:
case C_QUERY_READ:
- cur_set = &read_fds;
+ ev = POLLIN;
break;
case C_CONNECT_WRITE:
case C_QUERY_WRITE:
- cur_set = &write_fds;
+ ev = POLLOUT;
break;
}
/* add fd to proper set */
- fd = PQsocket(conn->db);
- if (fd > fd_max)
- fd_max = fd;
- else if (fd < 0)
- plproxy_error(func, "libpq has lost its socket: fd=%d err=%s",
- fd, PQerrorMessage(conn->db));
- FD_SET(fd, cur_set);
+ pf = pfd_cache + numfds++;
+ pf->fd = PQsocket(conn->db);
+ pf->events = ev;
+ pf->revents = 0;
}
- /* set timeout */
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
-
- /* just in case detect if too many fds */
-#ifdef FD_SETSIZE
- if (fd_max > FD_SETSIZE)
- plproxy_error(func, "Sorry, fd_set to select() too big: FD_SETSIZE=%d, fd_max=%d",
- FD_SETSIZE, fd_max);
-#endif
-
/* wait for events */
- res = select(fd_max + 1, &read_fds, &write_fds, NULL, &timeout);
+ res = poll(pfd_cache, numfds, 1000);
if (res == 0)
return 0;
if (res < 0)
{
if (errno == EINTR)
return 0;
- plproxy_error(func, "select() failed: %s", strerror(errno));
+ plproxy_error(func, "poll() failed: %s", strerror(errno));
}
/* now recheck the conns */
+ pf = pfd_cache;
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
if (!conn->run_on)
continue;
- /* look in which set it should be */
switch (conn->state)
{
case C_DONE:
@@ -493,21 +479,22 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
continue;
case C_CONNECT_READ:
case C_QUERY_READ:
- cur_set = &read_fds;
- break;
case C_CONNECT_WRITE:
case C_QUERY_WRITE:
- cur_set = &write_fds;
break;
}
- /* check */
+ /*
+ * they should be in same order as called,
+ */
fd = PQsocket(conn->db);
- if (fd < 0)
- elog(WARNING, "libpq dropped socket: fd=%d err=%s",
- fd, PQerrorMessage(conn->db));
- else if (FD_ISSET(fd, cur_set))
+ if (pf->fd != fd)
+ elog(WARNING, "fd order from poll() is messed up?");
+
+ if (pf->revents)
handle_conn(func, conn);
+
+ pf++;
}
return 1;
}