Revert "libpqwalreceiver: Convert to libpq-be-fe-helpers.h"
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 23 Jan 2024 08:38:07 +0000 (10:38 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 23 Jan 2024 08:38:07 +0000 (10:38 +0200)
This reverts commit 728f86fec65537eade8d9e751961782ddb527934.

The signal handling was a few bricks shy of a load in that commit,
which made the walreceiver non-responsive to SIGTERM while it was
waiting for the connection to be established. That prevented a standby
from being promoted.

Since it was non-essential refactoring, let's revert it to make v16
work the same as earlier releases. I reverted it in 'master' too, to
keep the branches in sync. The refactoring was a good idea as such,
but it needs a bit more work. Once we have developed a complete patch
with this issue fixed, let's re-apply that to 'master'.

Reported-by: Kyotaro Horiguchi
Backpatch-through: 16
Discussion: https://www.postgresql.org/message-id/20231231.200741.1078989336605759878.horikyota.ntt@gmail.com

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

index 201c36cb220606bdee76c275ef98ee153dffe66e..77669074e826b28879f2dc2b4d319bfc0ee56014 100644 (file)
@@ -24,7 +24,6 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
-#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -133,6 +132,7 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
                                 const char *appname, char **err)
 {
        WalReceiverConn *conn;
+       PostgresPollingStatusType status;
        const char *keys[6];
        const char *vals[6];
        int                     i = 0;
@@ -188,17 +188,56 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
        Assert(i < sizeof(keys));
 
        conn = palloc0(sizeof(WalReceiverConn));
-       conn->streamConn =
-               libpqsrv_connect_params(keys, vals,
-                                                                /* expand_dbname = */ true,
-                                                               WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+       conn->streamConn = PQconnectStartParams(keys, vals,
+                                                                                        /* expand_dbname = */ true);
+       if (PQstatus(conn->streamConn) == CONNECTION_BAD)
+               goto bad_connection_errmsg;
+
+       /*
+        * Poll connection until we have OK or FAILED status.
+        *
+        * Per spec for PQconnectPoll, first wait till socket is write-ready.
+        */
+       status = PGRES_POLLING_WRITING;
+       do
+       {
+               int                     io_flag;
+               int                     rc;
+
+               if (status == PGRES_POLLING_READING)
+                       io_flag = WL_SOCKET_READABLE;
+#ifdef WIN32
+               /* Windows needs a different test while waiting for connection-made */
+               else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
+                       io_flag = WL_SOCKET_CONNECTED;
+#endif
+               else
+                       io_flag = WL_SOCKET_WRITEABLE;
+
+               rc = WaitLatchOrSocket(MyLatch,
+                                                          WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
+                                                          PQsocket(conn->streamConn),
+                                                          0,
+                                                          WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+
+               /* Interrupted? */
+               if (rc & WL_LATCH_SET)
+               {
+                       ResetLatch(MyLatch);
+                       ProcessWalRcvInterrupts();
+               }
+
+               /* If socket is ready, advance the libpq state machine */
+               if (rc & io_flag)
+                       status = PQconnectPoll(conn->streamConn);
+       } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
        if (PQstatus(conn->streamConn) != CONNECTION_OK)
                goto bad_connection_errmsg;
 
        if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
        {
-               libpqsrv_disconnect(conn->streamConn);
+               PQfinish(conn->streamConn);
                pfree(conn);
 
                ereport(ERROR,
@@ -234,7 +273,7 @@ bad_connection_errmsg:
 
        /* error path, error already set */
 bad_connection:
-       libpqsrv_disconnect(conn->streamConn);
+       PQfinish(conn->streamConn);
        pfree(conn);
        return NULL;
 }
@@ -770,7 +809,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-       libpqsrv_disconnect(conn->streamConn);
+       PQfinish(conn->streamConn);
        PQfreemem(conn->recvBuf);
        pfree(conn);
 }