Allow queries submitted by postgres_fdw to be canceled.
authorRobert Haas <rhaas@postgresql.org>
Thu, 21 Apr 2016 14:46:09 +0000 (10:46 -0400)
committerRobert Haas <rhaas@postgresql.org>
Thu, 21 Apr 2016 14:49:09 +0000 (10:49 -0400)
This fixes a problem which is not new, but with the advent of direct
foreign table modification in 0bf3ae88af330496517722e391e7c975e6bad219,
it's somewhat more likely to be annoying than previously.  So,
arrange for a local query cancelation to propagate to the remote side.

Michael Paquier, reviewed by Etsuro Fujita.  Original report by
Thom Brown.

contrib/postgres_fdw/connection.c
contrib/postgres_fdw/postgres_fdw.c
contrib/postgres_fdw/postgres_fdw.h

index 189f290cdf6a94b309042822f26a80986ef78b6c..16ef38fff785a8a27c4b6baa86bb7eee745e72b0 100644 (file)
@@ -17,6 +17,7 @@
 #include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "storage/latch.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
 
@@ -447,6 +448,78 @@ GetPrepStmtNumber(PGconn *conn)
    return ++prep_stmt_number;
 }
 
+/*
+ * Submit a query and wait for the result.
+ *
+ * This function is interruptible by signals.
+ *
+ * Caller is responsible for the error handling on the result.
+ */
+PGresult *
+pgfdw_exec_query(PGconn *conn, const char *query)
+{
+   /*
+    * Submit a query.  Since we don't use non-blocking mode, this also can
+    * block.  But its risk is relatively small, so we ignore that for now.
+    */
+   if (!PQsendQuery(conn, query))
+       pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+   /* Wait for the result. */
+   return pgfdw_get_result(conn, query);
+}
+
+/*
+ * Wait for the result from a prior asynchronous execution function call.
+ *
+ * This function offers quick responsiveness by checking for any interruptions.
+ *
+ * This function emulates the PQexec()'s behavior of returning the last result
+ * when there are many.
+ *
+ * Caller is responsible for the error handling on the result.
+ */
+PGresult *
+pgfdw_get_result(PGconn *conn, const char *query)
+{
+   PGresult   *last_res = NULL;
+
+   for (;;)
+   {
+       PGresult *res;
+
+       while (PQisBusy(conn))
+       {
+           int     wc;
+
+           /* Sleep until there's something to do */
+           wc = WaitLatchOrSocket(MyLatch,
+                                  WL_LATCH_SET | WL_SOCKET_READABLE,
+                                  PQsocket(conn),
+                                  -1L);
+           ResetLatch(MyLatch);
+
+           CHECK_FOR_INTERRUPTS();
+
+           /* Data available in socket */
+           if (wc & WL_SOCKET_READABLE)
+           {
+               if (!PQconsumeInput(conn))
+                   pgfdw_report_error(ERROR, NULL, conn, false, query);
+           }
+       }
+
+       res = PQgetResult(conn);
+       if (res == NULL)
+           break;              /* query is complete */
+
+       PQclear(last_res);
+       last_res = res;
+   }
+
+   return last_res;
+}
+
 /*
  * Report an error we got from the remote server.
  *
@@ -598,6 +671,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                case XACT_EVENT_ABORT:
                    /* Assume we might have lost track of prepared statements */
                    entry->have_error = true;
+
+                   /*
+                    * If a command has been submitted to the remote server by
+                    * using an asynchronous execution function, the command
+                    * might not have yet completed.  Check to see if a command
+                    * is still being processed by the remote server, and if so,
+                    * request cancellation of the command; if not, abort
+                    * gracefully.
+                    */
+                   if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+                   {
+                       PGcancel   *cancel;
+                       char        errbuf[256];
+
+                       if ((cancel = PQgetCancel(entry->conn)))
+                       {
+                           if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+                               ereport(WARNING,
+                                       (errcode(ERRCODE_CONNECTION_FAILURE),
+                                        errmsg("could not send cancel request: %s",
+                                               errbuf)));
+                           PQfreeCancel(cancel);
+                       }
+                       break;
+                   }
+
                    /* If we're aborting, abort all remote transactions too */
                    res = PQexec(entry->conn, "ABORT TRANSACTION");
                    /* Note: can't throw ERROR, it would be infinite loop */
index 28093e545628d52f0086b57ae2213e7c88033188..2f492683a86e130cdec5afc232599f7be097e74e 100644 (file)
@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = PQexec(fsstate->conn, sql);
+   res = pgfdw_exec_query(fsstate->conn, sql);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
    PQclear(res);
@@ -1749,18 +1749,24 @@ postgresExecForeignInsert(EState *estate,
    p_values = convert_prep_stmt_params(fmstate, NULL, slot);
 
    /*
-    * Execute the prepared statement, and check for success.
+    * Execute the prepared statement.
+    */
+   if (!PQsendQueryPrepared(fmstate->conn,
+                            fmstate->p_name,
+                            fmstate->p_nums,
+                            p_values,
+                            NULL,
+                            NULL,
+                            0))
+       pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+   /*
+    * Get the result, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = PQexecPrepared(fmstate->conn,
-                        fmstate->p_name,
-                        fmstate->p_nums,
-                        p_values,
-                        NULL,
-                        NULL,
-                        0);
+   res = pgfdw_get_result(fmstate->conn, fmstate->query);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1819,18 +1825,24 @@ postgresExecForeignUpdate(EState *estate,
                                        slot);
 
    /*
-    * Execute the prepared statement, and check for success.
+    * Execute the prepared statement.
+    */
+   if (!PQsendQueryPrepared(fmstate->conn,
+                            fmstate->p_name,
+                            fmstate->p_nums,
+                            p_values,
+                            NULL,
+                            NULL,
+                            0))
+       pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+   /*
+    * Get the result, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = PQexecPrepared(fmstate->conn,
-                        fmstate->p_name,
-                        fmstate->p_nums,
-                        p_values,
-                        NULL,
-                        NULL,
-                        0);
+   res = pgfdw_get_result(fmstate->conn, fmstate->query);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1889,18 +1901,24 @@ postgresExecForeignDelete(EState *estate,
                                        NULL);
 
    /*
-    * Execute the prepared statement, and check for success.
+    * Execute the prepared statement.
+    */
+   if (!PQsendQueryPrepared(fmstate->conn,
+                            fmstate->p_name,
+                            fmstate->p_nums,
+                            p_values,
+                            NULL,
+                            NULL,
+                            0))
+       pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+   /*
+    * Get the result, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = PQexecPrepared(fmstate->conn,
-                        fmstate->p_name,
-                        fmstate->p_nums,
-                        p_values,
-                        NULL,
-                        NULL,
-                        0);
+   res = pgfdw_get_result(fmstate->conn, fmstate->query);
    if (PQresultStatus(res) !=
        (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
@@ -1950,7 +1968,7 @@ postgresEndForeignModify(EState *estate,
         * We don't use a PG_TRY block here, so be careful not to throw error
         * without releasing the PGresult.
         */
-       res = PQexec(fmstate->conn, sql);
+       res = pgfdw_exec_query(fmstate->conn, sql);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
            pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
        PQclear(res);
@@ -2712,7 +2730,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
        /*
         * Execute EXPLAIN remotely.
         */
-       res = PQexec(conn, sql);
+       res = pgfdw_exec_query(conn, sql);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -2817,12 +2835,18 @@ create_cursor(ForeignScanState *node)
     * parameter (see deparse.c), the "inference" is trivial and will produce
     * the desired result.  This allows us to avoid assuming that the remote
     * server has the same OIDs we do for the parameters' types.
+    */
+   if (!PQsendQueryParams(conn, buf.data, numParams,
+                          NULL, values, NULL, NULL, 0))
+       pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+   /*
+    * Get the result, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = PQexecParams(conn, buf.data, numParams, NULL, values,
-                      NULL, NULL, 0);
+   res = pgfdw_get_result(conn, buf.data);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
    PQclear(res);
@@ -2868,7 +2892,7 @@ fetch_more_data(ForeignScanState *node)
        snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
                 fsstate->fetch_size, fsstate->cursor_number);
 
-       res = PQexec(conn, sql);
+       res = pgfdw_exec_query(conn, sql);
        /* On error, report the original query, not the FETCH. */
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2978,7 +3002,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = PQexec(conn, sql);
+   res = pgfdw_exec_query(conn, sql);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, conn, true, sql);
    PQclear(res);
@@ -3006,16 +3030,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
     * with the remote server using different type OIDs than we do.  All of
     * the prepared statements we use in this module are simple enough that
     * the remote server will make the right choices.
+    */
+   if (!PQsendPrepare(fmstate->conn,
+                      p_name,
+                      fmstate->query,
+                      0,
+                      NULL))
+       pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+   /*
+    * Get the result, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = PQprepare(fmstate->conn,
-                   p_name,
-                   fmstate->query,
-                   0,
-                   NULL);
-
+   res = pgfdw_get_result(fmstate->conn, fmstate->query);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
    PQclear(res);
@@ -3147,12 +3176,18 @@ execute_dml_stmt(ForeignScanState *node)
     * parameter (see deparse.c), the "inference" is trivial and will produce
     * the desired result.  This allows us to avoid assuming that the remote
     * server has the same OIDs we do for the parameters' types.
+    */
+   if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+                          NULL, values, NULL, NULL, 0))
+       pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+   /*
+    * Get the result, and check for success.
     *
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
-                                  numParams, NULL, values, NULL, NULL, 0);
+   dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
    if (PQresultStatus(dmstate->result) !=
        (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
        pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
@@ -3355,7 +3390,7 @@ postgresAnalyzeForeignTable(Relation relation,
    /* In what follows, do not risk leaking any PGresults. */
    PG_TRY();
    {
-       res = PQexec(conn, sql.data);
+       res = pgfdw_exec_query(conn, sql.data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -3449,7 +3484,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
    /* In what follows, do not risk leaking any PGresults. */
    PG_TRY();
    {
-       res = PQexec(conn, sql.data);
+       res = pgfdw_exec_query(conn, sql.data);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
            pgfdw_report_error(ERROR, res, conn, false, sql.data);
        PQclear(res);
@@ -3500,7 +3535,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
            snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
                     fetch_size, cursor_number);
 
-           res = PQexec(conn, fetch_sql);
+           res = pgfdw_exec_query(conn, fetch_sql);
            /* On error, report the original query, not the FETCH. */
            if (PQresultStatus(res) != PGRES_TUPLES_OK)
                pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3675,7 +3710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
        appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
        deparseStringLiteral(&buf, stmt->remote_schema);
 
-       res = PQexec(conn, buf.data);
+       res = pgfdw_exec_query(conn, buf.data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -3774,7 +3809,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
        appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
 
        /* Fetch the data */
-       res = PQexec(conn, buf.data);
+       res = pgfdw_exec_query(conn, buf.data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
index 3a11d994d54d34b18f60250e808ce22ceb4be39a..574b07d16c81ab28f2b97b79c18d1a7cc66c5c1f 100644 (file)
@@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
+extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
                   bool clear, const char *sql);