Revert "libpqwalreceiver: Convert to libpq-be-fe-helpers.h"
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 23 Jan 2024 08:38:07 +0000 (10:38 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 23 Jan 2024 08:53:23 +0000 (10:53 +0200)
This reverts commit 728f86fec65537eade8d9e751961782ddb527934.

The signal handling was a few bricks shy of a load in that commit,
which made the walreceiver non-responsive to SIGTERM while it was
waiting for the connection to be established. That prevented a standby
from being promoted.

Since it was non-essential refactoring, let's revert it to make v16
work the same as earlier releases. I reverted it in 'master' too, to
keep the branches in sync. The refactoring was a good idea as such,
but it needs a bit more work. Once we have developed a complete patch
with this issue fixed, let's re-apply that to 'master'.

Reported-by: Kyotaro Horiguchi
Backpatch-through: 16
Discussion: https://www.postgresql.org/message-id/20231231.200741.1078989336605759878.horikyota.ntt@gmail.com

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

index 41910fd34e5de6357e973393755fa1cfe6087b21..b4038e114d8ca11f2bfa4380431ff8f2d6c09dcc 100644 (file)
@@ -24,7 +24,6 @@
 #include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
-#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -133,6 +132,7 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
                 const char *appname, char **err)
 {
    WalReceiverConn *conn;
+   PostgresPollingStatusType status;
    const char *keys[6];
    const char *vals[6];
    int         i = 0;
@@ -188,17 +188,56 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
    Assert(i < sizeof(keys));
 
    conn = palloc0(sizeof(WalReceiverConn));
-   conn->streamConn =
-       libpqsrv_connect_params(keys, vals,
-                                /* expand_dbname = */ true,
-                               WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+   conn->streamConn = PQconnectStartParams(keys, vals,
+                                            /* expand_dbname = */ true);
+   if (PQstatus(conn->streamConn) == CONNECTION_BAD)
+       goto bad_connection_errmsg;
+
+   /*
+    * Poll connection until we have OK or FAILED status.
+    *
+    * Per spec for PQconnectPoll, first wait till socket is write-ready.
+    */
+   status = PGRES_POLLING_WRITING;
+   do
+   {
+       int         io_flag;
+       int         rc;
+
+       if (status == PGRES_POLLING_READING)
+           io_flag = WL_SOCKET_READABLE;
+#ifdef WIN32
+       /* Windows needs a different test while waiting for connection-made */
+       else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
+           io_flag = WL_SOCKET_CONNECTED;
+#endif
+       else
+           io_flag = WL_SOCKET_WRITEABLE;
+
+       rc = WaitLatchOrSocket(MyLatch,
+                              WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
+                              PQsocket(conn->streamConn),
+                              0,
+                              WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+
+       /* Interrupted? */
+       if (rc & WL_LATCH_SET)
+       {
+           ResetLatch(MyLatch);
+           ProcessWalRcvInterrupts();
+       }
+
+       /* If socket is ready, advance the libpq state machine */
+       if (rc & io_flag)
+           status = PQconnectPoll(conn->streamConn);
+   } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
 
    if (PQstatus(conn->streamConn) != CONNECTION_OK)
        goto bad_connection_errmsg;
 
    if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
    {
-       libpqsrv_disconnect(conn->streamConn);
+       PQfinish(conn->streamConn);
        pfree(conn);
 
        ereport(ERROR,
@@ -234,7 +273,7 @@ bad_connection_errmsg:
 
    /* error path, error already set */
 bad_connection:
-   libpqsrv_disconnect(conn->streamConn);
+   PQfinish(conn->streamConn);
    pfree(conn);
    return NULL;
 }
@@ -772,7 +811,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-   libpqsrv_disconnect(conn->streamConn);
+   PQfinish(conn->streamConn);
    PQfreemem(conn->recvBuf);
    pfree(conn);
 }