#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
-#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
const char *appname, char **err)
{
WalReceiverConn *conn;
+ PostgresPollingStatusType status;
const char *keys[6];
const char *vals[6];
int i = 0;
Assert(i < sizeof(keys));
conn = palloc0(sizeof(WalReceiverConn));
- conn->streamConn =
- libpqsrv_connect_params(keys, vals,
- /* expand_dbname = */ true,
- WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+ conn->streamConn = PQconnectStartParams(keys, vals,
+ /* expand_dbname = */ true);
+ if (PQstatus(conn->streamConn) == CONNECTION_BAD)
+ goto bad_connection_errmsg;
+
+ /*
+ * Poll connection until we have OK or FAILED status.
+ *
+ * Per spec for PQconnectPoll, first wait till socket is write-ready.
+ */
+ status = PGRES_POLLING_WRITING;
+ do
+ {
+ int io_flag;
+ int rc;
+
+ if (status == PGRES_POLLING_READING)
+ io_flag = WL_SOCKET_READABLE;
+#ifdef WIN32
+ /* Windows needs a different test while waiting for connection-made */
+ else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
+ io_flag = WL_SOCKET_CONNECTED;
+#endif
+ else
+ io_flag = WL_SOCKET_WRITEABLE;
+
+ rc = WaitLatchOrSocket(MyLatch,
+ WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
+ PQsocket(conn->streamConn),
+ 0,
+ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+
+ /* Interrupted? */
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ ProcessWalRcvInterrupts();
+ }
+
+ /* If socket is ready, advance the libpq state machine */
+ if (rc & io_flag)
+ status = PQconnectPoll(conn->streamConn);
+ } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
if (PQstatus(conn->streamConn) != CONNECTION_OK)
goto bad_connection_errmsg;
if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
{
- libpqsrv_disconnect(conn->streamConn);
+ PQfinish(conn->streamConn);
pfree(conn);
ereport(ERROR,
/* error path, error already set */
bad_connection:
- libpqsrv_disconnect(conn->streamConn);
+ PQfinish(conn->streamConn);
pfree(conn);
return NULL;
}
static void
libpqrcv_disconnect(WalReceiverConn *conn)
{
- libpqsrv_disconnect(conn->streamConn);
+ PQfinish(conn->streamConn);
PQfreemem(conn->recvBuf);
pfree(conn);
}