summaryrefslogtreecommitdiff
path: root/contrib/dblink/dblink.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/dblink/dblink.c')
-rw-r--r--contrib/dblink/dblink.c25
1 files changed, 14 insertions, 11 deletions
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 824d301aad8..aa018b7747e 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -48,6 +48,7 @@
#include "funcapi.h"
#include "lib/stringinfo.h"
#include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "parser/scansup.h"
@@ -59,6 +60,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/varlena.h"
+#include "utils/wait_event.h"
PG_MODULE_MAGIC;
@@ -478,7 +480,7 @@ dblink_open(PG_FUNCTION_ARGS)
/* If we are not in a transaction, start one */
if (PQtransactionStatus(conn) == PQTRANS_IDLE)
{
- res = PQexec(conn, "BEGIN");
+ res = libpqsrv_exec(conn, "BEGIN", PG_WAIT_EXTENSION);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
dblink_res_internalerror(conn, res, "begin error");
PQclear(res);
@@ -497,7 +499,7 @@ dblink_open(PG_FUNCTION_ARGS)
(rconn->openCursorCount)++;
appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
- res = PQexec(conn, buf.data);
+ res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
dblink_res_error(conn, conname, res, fail,
@@ -566,7 +568,7 @@ dblink_close(PG_FUNCTION_ARGS)
appendStringInfo(&buf, "CLOSE %s", curname);
/* close the cursor */
- res = PQexec(conn, buf.data);
+ res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
{
dblink_res_error(conn, conname, res, fail,
@@ -586,7 +588,7 @@ dblink_close(PG_FUNCTION_ARGS)
{
rconn->newXactForCursor = false;
- res = PQexec(conn, "COMMIT");
+ res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
dblink_res_internalerror(conn, res, "commit error");
PQclear(res);
@@ -668,7 +670,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
* PGresult will be long-lived even though we are still in a short-lived
* memory context.
*/
- res = PQexec(conn, buf.data);
+ res = libpqsrv_exec(conn, buf.data, PG_WAIT_EXTENSION);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -816,7 +818,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
else
{
/* async result retrieval, do it the old way */
- PGresult *res = PQgetResult(conn);
+ PGresult *res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
/* NULL means we're all done with the async results */
if (res)
@@ -1130,7 +1132,8 @@ materializeQueryResult(FunctionCallInfo fcinfo,
PQclear(sinfo.last_res);
PQclear(sinfo.cur_res);
/* and clear out any pending data in libpq */
- while ((res = PQgetResult(conn)) != NULL)
+ while ((res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION)) !=
+ NULL)
PQclear(res);
PG_RE_THROW();
}
@@ -1157,7 +1160,7 @@ storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
{
CHECK_FOR_INTERRUPTS();
- sinfo->cur_res = PQgetResult(conn);
+ sinfo->cur_res = libpqsrv_get_result(conn, PG_WAIT_EXTENSION);
if (!sinfo->cur_res)
break;
@@ -1485,7 +1488,7 @@ dblink_exec(PG_FUNCTION_ARGS)
if (!conn)
dblink_conn_not_avail(conname);
- res = PQexec(conn, sql);
+ res = libpqsrv_exec(conn, sql, PG_WAIT_EXTENSION);
if (!res ||
(PQresultStatus(res) != PGRES_COMMAND_OK &&
PQresultStatus(res) != PGRES_TUPLES_OK))
@@ -2771,8 +2774,8 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
/*
* If we don't get a message from the PGresult, try the PGconn. This is
- * needed because for connection-level failures, PQexec may just return
- * NULL, not a PGresult at all.
+ * needed because for connection-level failures, PQgetResult may just
+ * return NULL, not a PGresult at all.
*/
if (message_primary == NULL)
message_primary = pchomp(PQerrorMessage(conn));