Simplify waiting logic in reading from / writing to client.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 13 Feb 2015 19:46:14 +0000 (21:46 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 13 Feb 2015 19:46:14 +0000 (21:46 +0200)
The client socket is always in non-blocking mode, and if we actually want
blocking behaviour, we emulate it by sleeping and retrying. But we have
retry loops at different layers for reads and writes, which was confusing.
To simplify, remove all the sleeping and retrying code from the lower
levels, from be_tls_read and secure_raw_read and secure_raw_write, and put
all the logic in secure_read() and secure_write().

src/backend/libpq/be-secure-openssl.c
src/backend/libpq/be-secure.c
src/backend/libpq/pqcomm.c
src/include/libpq/libpq-be.h

index d13ce334cccf8145c8882db3fd7e072d07e3847d..37af6e4fdaf75ab531123859852362a4d2527822 100644 (file)
@@ -511,14 +511,11 @@ be_tls_close(Port *port)
  *     Read data from a secure connection.
  */
 ssize_t
-be_tls_read(Port *port, void *ptr, size_t len)
+be_tls_read(Port *port, void *ptr, size_t len, int *waitfor)
 {
        ssize_t         n;
        int                     err;
-       int                     waitfor;
-       int                     latchret;
 
-rloop:
        errno = 0;
        n = SSL_read(port->ssl, ptr, len);
        err = SSL_get_error(port->ssl, n);
@@ -528,39 +525,15 @@ rloop:
                        port->count += n;
                        break;
                case SSL_ERROR_WANT_READ:
+                       *waitfor = WL_SOCKET_READABLE;
+                       errno = EWOULDBLOCK;
+                       n = -1;
+                       break;
                case SSL_ERROR_WANT_WRITE:
-                       /* Don't retry if the socket is in nonblocking mode. */
-                       if (port->noblock)
-                       {
-                               errno = EWOULDBLOCK;
-                               n = -1;
-                               break;
-                       }
-
-                       waitfor = WL_LATCH_SET;
-
-                       if (err == SSL_ERROR_WANT_READ)
-                               waitfor |= WL_SOCKET_READABLE;
-                       else
-                               waitfor |= WL_SOCKET_WRITEABLE;
-
-                       latchret = WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0);
-
-                       /*
-                        * We'll, among other situations, get here if the low level
-                        * routine doing the actual recv() via the socket got interrupted
-                        * by a signal. That's so we can handle interrupts once outside
-                        * openssl, so we don't jump out from underneath its covers. We
-                        * can check this both, when reading and writing, because even
-                        * when writing that's just openssl's doing, not a 'proper' write
-                        * initiated by postgres.
-                        */
-                       if (latchret & WL_LATCH_SET)
-                       {
-                               ResetLatch(MyLatch);
-                               ProcessClientReadInterrupt(true);  /* preserves errno */
-                       }
-                       goto rloop;
+                       *waitfor = WL_SOCKET_WRITEABLE;
+                       errno = EWOULDBLOCK;
+                       n = -1;
+                       break;
                case SSL_ERROR_SYSCALL:
                        /* leave it to caller to ereport the value of errno */
                        if (n != -1)
@@ -595,12 +568,10 @@ rloop:
  *     Write data to a secure connection.
  */
 ssize_t
-be_tls_write(Port *port, void *ptr, size_t len)
+be_tls_write(Port *port, void *ptr, size_t len, int *waitfor)
 {
        ssize_t         n;
        int                     err;
-       int                     waitfor;
-       int                     latchret;
 
        /*
         * If SSL renegotiations are enabled and we're getting close to the
@@ -630,7 +601,6 @@ be_tls_write(Port *port, void *ptr, size_t len)
                                         errmsg("SSL failure during renegotiation start")));
        }
 
-wloop:
        errno = 0;
        n = SSL_write(port->ssl, ptr, len);
        err = SSL_get_error(port->ssl, n);
@@ -640,30 +610,15 @@ wloop:
                        port->count += n;
                        break;
                case SSL_ERROR_WANT_READ:
+                       *waitfor = WL_SOCKET_READABLE;
+                       errno = EWOULDBLOCK;
+                       n = -1;
+                       break;
                case SSL_ERROR_WANT_WRITE:
-
-                       waitfor = WL_LATCH_SET;
-
-                       if (err == SSL_ERROR_WANT_READ)
-                               waitfor |= WL_SOCKET_READABLE;
-                       else
-                               waitfor |= WL_SOCKET_WRITEABLE;
-
-                       latchret = WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0);
-
-                       /*
-                        * Check for interrupts here, in addition to secure_write(),
-                        * because an interrupted write in secure_raw_write() will return
-                        * here, and we cannot return to secure_write() until we've
-                        * written something.
-                        */
-                       if (latchret & WL_LATCH_SET)
-                       {
-                               ResetLatch(MyLatch);
-                               ProcessClientWriteInterrupt(true); /* preserves errno */
-                       }
-
-                       goto wloop;
+                       *waitfor = WL_SOCKET_WRITEABLE;
+                       errno = EWOULDBLOCK;
+                       n = -1;
+                       break;
                case SSL_ERROR_SYSCALL:
                        /* leave it to caller to ereport the value of errno */
                        if (n != -1)
index c2c1842eb8ee80d2c94b8c3e6345e15ca307a746..4e7acbe0804c6cc4befe455a383769ba9b33c9e0 100644 (file)
@@ -127,30 +127,45 @@ ssize_t
 secure_read(Port *port, void *ptr, size_t len)
 {
        ssize_t         n;
+       int                     waitfor;
 
 retry:
 #ifdef USE_SSL
+       waitfor = 0;
        if (port->ssl_in_use)
        {
-               n = be_tls_read(port, ptr, len);
+               n = be_tls_read(port, ptr, len, &waitfor);
        }
        else
 #endif
        {
                n = secure_raw_read(port, ptr, len);
+               waitfor = WL_SOCKET_READABLE;
        }
 
-       /* retry after processing interrupts */
-       if (n < 0 && errno == EINTR)
+       /* In blocking mode, wait until the socket is ready */
+       if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
        {
-               /*
-                * We tried to read data, the socket was empty, and we were
-                * interrupted while waiting for readability. We only process
-                * interrupts if we got interrupted while reading and when in blocking
-                * mode. In other cases it's better to allow the interrupts to be
-                * handled at higher layers.
-                */
-               ProcessClientReadInterrupt(!port->noblock); /* preserves errno */
+               int             w;
+
+               Assert(waitfor);
+
+               w = WaitLatchOrSocket(MyLatch,
+                                                         WL_LATCH_SET | waitfor,
+                                                         port->sock, 0);
+
+               /* Handle interrupt. */
+               if (w & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       ProcessClientReadInterrupt(true);
+
+                       /*
+                        * We'll retry the read. Most likely it will return immediately
+                        * because there's still no data available, and we'll wait
+                        * for the socket to become ready again.
+                        */
+               }
                goto retry;
        }
 
@@ -173,7 +188,6 @@ secure_raw_read(Port *port, void *ptr, size_t len)
         * Try to read from the socket without blocking. If it succeeds we're
         * done, otherwise we'll wait for the socket using the latch mechanism.
         */
-rloop:
 #ifdef WIN32
        pgwin32_noblock = true;
 #endif
@@ -182,37 +196,6 @@ rloop:
        pgwin32_noblock = false;
 #endif
 
-       if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
-       {
-               int             w;
-               int             save_errno = errno;
-
-               w = WaitLatchOrSocket(MyLatch,
-                                                         WL_LATCH_SET | WL_SOCKET_READABLE,
-                                                         port->sock, 0);
-
-               if (w & WL_LATCH_SET)
-               {
-                       ResetLatch(MyLatch);
-                       /*
-                        * Force a return, so interrupts can be processed when not
-                        * (possibly) underneath a ssl library.
-                        */
-                       errno = EINTR;
-                       return -1;
-               }
-               else if (w & WL_SOCKET_READABLE)
-               {
-                       goto rloop;
-               }
-
-               /*
-                * Restore errno, clobbered by WaitLatchOrSocket, so the caller can
-                * react properly.
-                */
-               errno = save_errno;
-       }
-
        return n;
 }
 
@@ -224,33 +207,54 @@ ssize_t
 secure_write(Port *port, void *ptr, size_t len)
 {
        ssize_t         n;
+       int                     waitfor;
 
 retry:
+       waitfor = 0;
 #ifdef USE_SSL
        if (port->ssl_in_use)
        {
-               n = be_tls_write(port, ptr, len);
+               n = be_tls_write(port, ptr, len, &waitfor);
        }
        else
 #endif
        {
                n = secure_raw_write(port, ptr, len);
+               waitfor = WL_SOCKET_WRITEABLE;
        }
 
-       /* retry after processing interrupts */
-       if (n < 0 && errno == EINTR)
+       if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
        {
-               /*
-                * We tried to send data, the socket was full, and we were interrupted
-                * while waiting for writability. We only process interrupts if we got
-                * interrupted while writing and when in blocking mode. In other cases
-                * it's better to allow the interrupts to be handled at higher layers.
-                */
-               ProcessClientWriteInterrupt(!port->noblock);
+               int             w;
+
+               Assert(waitfor);
 
+               w = WaitLatchOrSocket(MyLatch,
+                                                         WL_LATCH_SET | waitfor,
+                                                         port->sock, 0);
+
+               /* Handle interrupt. */
+               if (w & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       ProcessClientWriteInterrupt(true);
+
+                       /*
+                        * We'll retry the write. Most likely it will return immediately
+                        * because there's still no data available, and we'll wait
+                        * for the socket to become ready again.
+                        */
+               }
                goto retry;
        }
 
+       /*
+        * Process interrupts that happened while (or before) sending. Note that
+        * we signal that we're not blocking, which will prevent some types of
+        * interrupts from being processed.
+        */
+       ProcessClientWriteInterrupt(false);
+
        return n;
 }
 
@@ -259,8 +263,6 @@ secure_raw_write(Port *port, const void *ptr, size_t len)
 {
        ssize_t         n;
 
-wloop:
-
 #ifdef WIN32
        pgwin32_noblock = true;
 #endif
@@ -269,36 +271,5 @@ wloop:
        pgwin32_noblock = false;
 #endif
 
-       if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
-       {
-               int             w;
-               int             save_errno = errno;
-
-               w = WaitLatchOrSocket(MyLatch,
-                                                         WL_LATCH_SET | WL_SOCKET_WRITEABLE,
-                                                         port->sock, 0);
-
-               if (w & WL_LATCH_SET)
-               {
-                       ResetLatch(MyLatch);
-                       /*
-                        * Force a return, so interrupts can be processed when not
-                        * (possibly) underneath a ssl library.
-                        */
-                       errno = EINTR;
-                       return -1;
-               }
-               else if (w & WL_SOCKET_WRITEABLE)
-               {
-                       goto wloop;
-               }
-
-               /*
-                * Restore errno, clobbered by WaitLatchOrSocket, so the caller can
-                * react properly.
-                */
-               errno = save_errno;
-       }
-
        return n;
 }
index 09dea4bbe165d921c904fb0087a2193205756a9e..34efac48651538abe8a6bf5a4b6a4cc8a2179ba1 100644 (file)
@@ -185,7 +185,8 @@ pq_init(void)
        /*
         * In backends (as soon as forked) we operate the underlying socket in
         * nonblocking mode and use latches to implement blocking semantics if
-        * needed. That allows us to provide safely interruptible reads.
+        * needed. That allows us to provide safely interruptible reads and
+        * writes.
         *
         * Use COMMERROR on failure, because ERROR would try to send the error to
         * the client, which might require changing the mode again, leading to
index ccd70218ae60f7e067b6237ceef2f2a79406d178..cf520f545d916cea83bba5c6ef3953e61862ef4b 100644 (file)
@@ -209,8 +209,8 @@ typedef struct Port
 extern void be_tls_init(void);
 extern int be_tls_open_server(Port *port);
 extern void be_tls_close(Port *port);
-extern ssize_t be_tls_read(Port *port, void *ptr, size_t len);
-extern ssize_t be_tls_write(Port *port, void *ptr, size_t len);
+extern ssize_t be_tls_read(Port *port, void *ptr, size_t len, int *waitfor);
+extern ssize_t be_tls_write(Port *port, void *ptr, size_t len, int *waitfor);
 
 #endif