#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
return ++prep_stmt_number;
}
+/*
+ * Submit a query and wait for the result.
+ *
+ * This function is interruptible by signals.
+ *
+ * Caller is responsible for the error handling on the result.
+ */
+PGresult *
+pgfdw_exec_query(PGconn *conn, const char *query)
+{
+ /*
+ * Submit a query. Since we don't use non-blocking mode, this also can
+ * block. But its risk is relatively small, so we ignore that for now.
+ */
+ if (!PQsendQuery(conn, query))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+ /* Wait for the result. */
+ return pgfdw_get_result(conn, query);
+}
+
+/*
+ * Wait for the result from a prior asynchronous execution function call.
+ *
+ * This function offers quick responsiveness by checking for any interruptions.
+ *
+ * This function emulates the PQexec()'s behavior of returning the last result
+ * when there are many.
+ *
+ * Caller is responsible for the error handling on the result.
+ */
+PGresult *
+pgfdw_get_result(PGconn *conn, const char *query)
+{
+ PGresult *last_res = NULL;
+
+ for (;;)
+ {
+ PGresult *res;
+
+ while (PQisBusy(conn))
+ {
+ int wc;
+
+ /* Sleep until there's something to do */
+ wc = WaitLatchOrSocket(MyLatch,
+ WL_LATCH_SET | WL_SOCKET_READABLE,
+ PQsocket(conn),
+ -1L);
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Data available in socket */
+ if (wc & WL_SOCKET_READABLE)
+ {
+ if (!PQconsumeInput(conn))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
+ }
+ }
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ break; /* query is complete */
+
+ PQclear(last_res);
+ last_res = res;
+ }
+
+ return last_res;
+}
+
/*
* Report an error we got from the remote server.
*
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
+
+ /*
+ * If a command has been submitted to the remote server by
+ * using an asynchronous execution function, the command
+ * might not have yet completed. Check to see if a command
+ * is still being processed by the remote server, and if so,
+ * request cancellation of the command; if not, abort
+ * gracefully.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ {
+ PGcancel *cancel;
+ char errbuf[256];
+
+ if ((cancel = PQgetCancel(entry->conn)))
+ {
+ if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+ ereport(WARNING,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send cancel request: %s",
+ errbuf)));
+ PQfreeCancel(cancel);
+ }
+ break;
+ }
+
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
slot);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
NULL);
/*
- * Execute the prepared statement, and check for success.
+ * Execute the prepared statement.
+ */
+ if (!PQsendQueryPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecPrepared(fmstate->conn,
- fmstate->p_name,
- fmstate->p_nums,
- p_values,
- NULL,
- NULL,
- 0);
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
/*
* Execute EXPLAIN remotely.
*/
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
+ */
+ if (!PQsendQueryParams(conn, buf.data, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecParams(conn, buf.data, numParams, NULL, values,
- NULL, NULL, 0);
+ res = pgfdw_get_result(conn, buf.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res);
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
* with the remote server using different type OIDs than we do. All of
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
+ */
+ if (!PQsendPrepare(fmstate->conn,
+ p_name,
+ fmstate->query,
+ 0,
+ NULL))
+ pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQprepare(fmstate->conn,
- p_name,
- fmstate->query,
- 0,
- NULL);
-
+ res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res);
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
+ */
+ if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+ /*
+ * Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
- numParams, NULL, values, NULL, NULL, 0);
+ dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
fetch_size, cursor_number);
- res = PQexec(conn, fetch_sql);
+ res = pgfdw_exec_query(conn, fetch_sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
- res = PQexec(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
- res = PQexec(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);