Rewrite async-connection loop in libpqwalreceiver.c, once again.
authorTom Lane <tgl@sss.pgh.pa.us>
Wed, 15 Mar 2017 17:26:26 +0000 (13:26 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Wed, 15 Mar 2017 17:26:26 +0000 (13:26 -0400)
The original coding in commit 1e8a85009 didn't use PQconnectPoll per
spec, and while the rewrite in e434ad39a is closer, it still doesn't
guarantee to wait until the socket is read-ready or write-ready (as
appropriate) before calling PQconnectPoll.  It's not clear whether
that omission is causing the continuing failures on buildfarm member
bowerbird; but given the lack of other explanations meeting the
available facts, let's tighten that up and see what happens.

An independent issue in the same loop was that it had a race condition
whereby it could clear the process's latch without having serviced an
interrupt request, causing failure to respond to a cancel while waiting
for connection (the very problem 1e8a85009 was meant to fix).

Discussion: https://postgr.es/m/7295.1489596949@sss.pgh.pa.us

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

index cd2e57867c062acc8132abc8f91656481aaa35c2..65a9e6c81ce82960f9b6837e18a378623fc898e7 100644 (file)
@@ -159,41 +159,41 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
        /*
         * Poll connection until we have OK or FAILED status.
         *
-        * Note that the initial state after PQconnectStartParams is
-        * PGRES_POLLING_WRITING.
+        * Per spec for PQconnectPoll, first wait till socket is write-ready.
         */
-       for (status = PGRES_POLLING_WRITING;
-                status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED;
-                status = PQconnectPoll(conn->streamConn))
+       status = PGRES_POLLING_WRITING;
+       do
        {
-               /* Sleep a bit if waiting for socket. */
-               if (status == PGRES_POLLING_READING ||
-                       status == PGRES_POLLING_WRITING)
+               /* Wait for socket ready and/or other events. */
+               int                     io_flag;
+               int                     rc;
+
+               io_flag = (status == PGRES_POLLING_READING
+                                  ? WL_SOCKET_READABLE
+                                  : WL_SOCKET_WRITEABLE);
+
+               rc = WaitLatchOrSocket(&MyProc->procLatch,
+                                                          WL_POSTMASTER_DEATH |
+                                                          WL_LATCH_SET | io_flag,
+                                                          PQsocket(conn->streamConn),
+                                                          0,
+                                                          WAIT_EVENT_LIBPQWALRECEIVER);
+
+               /* Emergency bailout? */
+               if (rc & WL_POSTMASTER_DEATH)
+                       exit(1);
+
+               /* Interrupted? */
+               if (rc & WL_LATCH_SET)
                {
-                       int             extra_flag;
-                       int             rc;
-
-                       extra_flag = (status == PGRES_POLLING_READING
-                                                 ? WL_SOCKET_READABLE
-                                                 : WL_SOCKET_WRITEABLE);
-
                        ResetLatch(&MyProc->procLatch);
-                       rc = WaitLatchOrSocket(&MyProc->procLatch,
-                                                                  WL_POSTMASTER_DEATH |
-                                                                  WL_LATCH_SET | extra_flag,
-                                                                  PQsocket(conn->streamConn),
-                                                                  0,
-                                                                  WAIT_EVENT_LIBPQWALRECEIVER);
-
-                       /* Emergency bailout. */
-                       if (rc & WL_POSTMASTER_DEATH)
-                               exit(1);
-
-                       /* Interrupted. */
-                       if (rc & WL_LATCH_SET)
-                               CHECK_FOR_INTERRUPTS();
+                       CHECK_FOR_INTERRUPTS();
                }
-       }
+
+               /* If socket is ready, advance the libpq state machine */
+               if (rc & io_flag)
+                       status = PQconnectPoll(conn->streamConn);
+       } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
        if (PQstatus(conn->streamConn) != CONNECTION_OK)
        {