libpq-be-fe-helpers.h: wrap new cancel APIs
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 28 Mar 2024 10:31:03 +0000 (11:31 +0100)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Thu, 28 Mar 2024 10:31:03 +0000 (11:31 +0100)
Commit 61461a300c1c introduced new functions to libpq for cancelling
queries.  This commit introduces a helper function that backend-side
libraries and extensions can use to invoke those.  This function takes a
timeout and can itself be interrupted while it is waiting for a cancel
request to be sent and processed, instead of being blocked.

This replaces the usage of the old functions in postgres_fdw and dblink.

Finally, it also adds some test coverage for the cancel support in
postgres_fdw.

Author: Jelte Fennema-Nio <postgres@jeltef.nl>
Discussion: https://postgr.es/m/CAGECzQT_VgOWWENUqvUV9xQmbaCyXjtRRAYO8W07oqashk_N+g@mail.gmail.com

contrib/dblink/dblink.c
contrib/postgres_fdw/connection.c
contrib/postgres_fdw/expected/postgres_fdw.out
contrib/postgres_fdw/sql/postgres_fdw.sql
src/include/libpq/libpq-be-fe-helpers.h

index edbc9ab02ac1510d0eadad78af965735fa2a6b2f..de858e165abdc5b779ad33de68564c217b5f5d93 100644 (file)
@@ -1347,25 +1347,16 @@ Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
    PGconn     *conn;
-   PGcancelConn *cancelConn;
    char       *msg;
+   TimestampTz endtime;
 
    dblink_init();
    conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-   cancelConn = PQcancelCreate(conn);
-
-   PG_TRY();
-   {
-       if (!PQcancelBlocking(cancelConn))
-           msg = pchomp(PQcancelErrorMessage(cancelConn));
-       else
-           msg = "OK";
-   }
-   PG_FINALLY();
-   {
-       PQcancelFinish(cancelConn);
-   }
-   PG_END_TRY();
+   endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+                                         30000);
+   msg = libpqsrv_cancel(conn, endtime);
+   if (msg == NULL)
+       msg = "OK";
 
    PG_RETURN_TEXT_P(cstring_to_text(msg));
 }
index 4931ebf591542674b673bbe0446203067ee84915..2532e453c4ec5cc6b31c9b35072cb82a73f8cadc 100644 (file)
@@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
 static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
 static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
 static bool pgfdw_cancel_query(PGconn *conn);
-static bool pgfdw_cancel_query_begin(PGconn *conn);
+static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
 static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
                                   bool consume_input);
 static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@@ -1315,36 +1315,31 @@ pgfdw_cancel_query(PGconn *conn)
    endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
                                          CONNECTION_CLEANUP_TIMEOUT);
 
-   if (!pgfdw_cancel_query_begin(conn))
+   if (!pgfdw_cancel_query_begin(conn, endtime))
        return false;
    return pgfdw_cancel_query_end(conn, endtime, false);
 }
 
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return true; if the timeout
+ * lapses without that, or the request fails for whatever reason, return
+ * false.
+ */
 static bool
-pgfdw_cancel_query_begin(PGconn *conn)
+pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
 {
-   PGcancel   *cancel;
-   char        errbuf[256];
+   char       *errormsg = libpqsrv_cancel(conn, endtime);
 
-   /*
-    * Issue cancel request.  Unfortunately, there's no good way to limit the
-    * amount of time that we might block inside PQgetCancel().
-    */
-   if ((cancel = PQgetCancel(conn)))
-   {
-       if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
-       {
-           ereport(WARNING,
-                   (errcode(ERRCODE_CONNECTION_FAILURE),
-                    errmsg("could not send cancel request: %s",
-                           errbuf)));
-           PQfreeCancel(cancel);
-           return false;
-       }
-       PQfreeCancel(cancel);
-   }
+   if (errormsg != NULL)
+       ereport(WARNING,
+               errcode(ERRCODE_CONNECTION_FAILURE),
+               errmsg("could not send cancel request: %s", errormsg));
 
-   return true;
+   return errormsg == NULL;
 }
 
 static bool
@@ -1685,7 +1680,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
     */
    if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
    {
-       if (!pgfdw_cancel_query_begin(entry->conn))
+       TimestampTz endtime;
+
+       endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
+                                             CONNECTION_CLEANUP_TIMEOUT);
+       if (!pgfdw_cancel_query_begin(entry->conn, endtime))
            return false;       /* Unable to cancel running query */
        *cancel_requested = lappend(*cancel_requested, entry);
    }
index 3f0110c52b9edb2be76d5cb9d0f4fedb279021d5..b7af86d3511c6d898a2ad74921c3809dd85906ef 100644 (file)
@@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 (10 rows)
 
 ALTER VIEW v4 OWNER TO regress_view_owner;
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+                                                                             QUERY PLAN                                                                              
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Foreign Scan
+   Output: (count(*))
+   Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
+   Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
+(4 rows)
+
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+ERROR:  canceling statement due to statement timeout
+RESET statement_timeout;
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
index 5fffc4c53bd75edb618a044f69d28dba9398a199..6e1c819159c917c25842ce67ab6ad1626cf447b6 100644 (file)
@@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
 SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
 ALTER VIEW v4 OWNER TO regress_view_owner;
 
+-- Make sure this big CROSS JOIN query is pushed down
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
+-- Make sure query cancellation works
+SET statement_timeout = '10ms';
+select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
+RESET statement_timeout;
+
 -- ====================================================================
 -- Check that userid to use when querying the remote table is correctly
 -- propagated into foreign rels present in subqueries under an UNION ALL
index 5d33bcf32f7763eb187ca5959232beab02b7d5c5..2adf92030af311fe32598ea86bc4cebecd2213bf 100644 (file)
@@ -44,6 +44,8 @@
 #include "miscadmin.h"
 #include "storage/fd.h"
 #include "storage/latch.h"
+#include "utils/timestamp.h"
+#include "utils/wait_event.h"
 
 
 static inline void libpqsrv_connect_prepare(void);
@@ -365,4 +367,91 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
    return PQgetResult(conn);
 }
 
+/*
+ * Submit a cancel request to the given connection, waiting only until
+ * the given time.
+ *
+ * We sleep interruptibly until we receive confirmation that the cancel
+ * request has been accepted, and if it is, return NULL; if the cancel
+ * request fails, return an error message string (which is not to be
+ * freed).
+ *
+ * For other problems (to wit: OOM when strdup'ing an error message from
+ * libpq), this function can ereport(ERROR).
+ *
+ * Note: this function leaks a string's worth of memory when reporting
+ * libpq errors.  Make sure to call it in a transient memory context.
+ */
+static inline char *
+libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
+{
+   PGcancelConn *cancel_conn;
+   char       *error = NULL;
+
+   cancel_conn = PQcancelCreate(conn);
+   if (cancel_conn == NULL)
+       return _("out of memory");
+
+   /* In what follows, do not leak any PGcancelConn on any errors. */
+
+   PG_TRY();
+   {
+       if (!PQcancelStart(cancel_conn))
+       {
+           error = pchomp(PQcancelErrorMessage(cancel_conn));
+           goto exit;
+       }
+
+       for (;;)
+       {
+           PostgresPollingStatusType pollres;
+           TimestampTz now;
+           long        cur_timeout;
+           int         waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
+
+           pollres = PQcancelPoll(cancel_conn);
+           if (pollres == PGRES_POLLING_OK)
+               break;          /* success! */
+
+           /* If timeout has expired, give up, else get sleep time. */
+           now = GetCurrentTimestamp();
+           cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
+           if (cur_timeout <= 0)
+           {
+               error = _("cancel request timed out");
+               break;
+           }
+
+           switch (pollres)
+           {
+               case PGRES_POLLING_READING:
+                   waitEvents |= WL_SOCKET_READABLE;
+                   break;
+               case PGRES_POLLING_WRITING:
+                   waitEvents |= WL_SOCKET_WRITEABLE;
+                   break;
+               default:
+                   error = pchomp(PQcancelErrorMessage(cancel_conn));
+                   goto exit;
+           }
+
+           /* Sleep until there's something to do */
+           WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
+                             cur_timeout, PG_WAIT_CLIENT);
+
+           ResetLatch(MyLatch);
+
+           CHECK_FOR_INTERRUPTS();
+       }
+exit:  ;
+   }
+   PG_FINALLY();
+   {
+       PQcancelFinish(cancel_conn);
+   }
+   PG_END_TRY();
+
+   return error;
+}
+
 #endif                         /* LIBPQ_BE_FE_HELPERS_H */