Add tests for libpq query cancellation APIs
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 11 Mar 2024 20:54:03 +0000 (21:54 +0100)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Mon, 11 Mar 2024 20:54:03 +0000 (21:54 +0100)
This is in preparation of making changes and additions to these APIs.

Author: Jelte Fennema-Nio <postgres@jeltef.nl>
Discussion: https://postgr.es/m/CAGECzQRb21spiiykQ48rzz8w+Hcykz+mB2_hxR65D9Qk6nnw=w@mail.gmail.com

src/test/modules/libpq_pipeline/libpq_pipeline.c

index 5f43aa40de4399d5f02ca57f64a68d2e48827281..97f8febf929859fea0b9c0a4b32661a0e362f14f 100644 (file)
@@ -64,6 +64,11 @@ exit_nicely(PGconn *conn)
    exit(1);
 }
 
+/*
+ * The following few functions are wrapped in macros to make the reported line
+ * number in an error match the line number of the invocation.
+ */
+
 /*
  * Print an error to stderr and terminate the program.
  */
@@ -74,7 +79,6 @@ pg_fatal_impl(int line, const char *fmt,...)
 {
    va_list     args;
 
-
    fflush(stdout);
 
    fprintf(stderr, "\n%s:%d: ", progname, line);
@@ -86,6 +90,170 @@ pg_fatal_impl(int line, const char *fmt,...)
    exit(1);
 }
 
+/*
+ * Check that the query on the given connection got canceled.
+ */
+#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
+static void
+confirm_query_canceled_impl(int line, PGconn *conn)
+{
+   PGresult   *res = NULL;
+
+   res = PQgetResult(conn);
+   if (res == NULL)
+       pg_fatal_impl(line, "PQgetResult returned null: %s",
+                     PQerrorMessage(conn));
+   if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+       pg_fatal_impl(line, "query did not fail when it was expected");
+   if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
+       pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
+                     PQerrorMessage(conn));
+   PQclear(res);
+
+   while (PQisBusy(conn))
+       PQconsumeInput(conn);
+}
+
+#define send_cancellable_query(conn, monitorConn) \
+   send_cancellable_query_impl(__LINE__, conn, monitorConn)
+static void
+send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
+{
+   const char *env_wait;
+   const Oid   paramTypes[1] = {INT4OID};
+   int         procpid = PQbackendPID(conn);
+
+   env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
+   if (env_wait == NULL)
+       env_wait = "180";
+
+   if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
+                         &env_wait, NULL, NULL, 0) != 1)
+       pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
+
+   /*
+    * Wait until the query is actually running. Otherwise sending a
+    * cancellation request might not cancel the query due to race conditions.
+    */
+   while (true)
+   {
+       char       *value;
+       PGresult   *res;
+       const char *paramValues[1];
+       char        pidval[16];
+
+       snprintf(pidval, 16, "%d", procpid);
+       paramValues[0] = pidval;
+
+       res = PQexecParams(monitorConn,
+                          "SELECT count(*) FROM pg_stat_activity WHERE "
+                          "pid = $1 AND state = 'active'",
+                          1, NULL, paramValues, NULL, NULL, 1);
+
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+           pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
+       if (PQntuples(res) != 1)
+           pg_fatal("unexpected number of rows received: %d", PQntuples(res));
+       if (PQnfields(res) != 1)
+           pg_fatal("unexpected number of columns received: %d", PQnfields(res));
+       value = PQgetvalue(res, 0, 0);
+       if (*value != '0')
+       {
+           PQclear(res);
+           break;
+       }
+       PQclear(res);
+
+       /* wait 10ms before polling again */
+       pg_usleep(10000);
+   }
+}
+
+/*
+ * Create a new connection with the same conninfo as the given one.
+ */
+static PGconn *
+copy_connection(PGconn *conn)
+{
+   PGconn     *copyConn;
+   PQconninfoOption *opts = PQconninfo(conn);
+   const char **keywords;
+   const char **vals;
+   int         nopts = 1;
+   int         i = 0;
+
+   for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
+       nopts++;
+
+   keywords = pg_malloc(sizeof(char *) * nopts);
+   vals = pg_malloc(sizeof(char *) * nopts);
+
+   for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
+   {
+       if (opt->val)
+       {
+           keywords[i] = opt->keyword;
+           vals[i] = opt->val;
+           i++;
+       }
+   }
+   keywords[i] = vals[i] = NULL;
+
+   copyConn = PQconnectdbParams(keywords, vals, false);
+
+   if (PQstatus(copyConn) != CONNECTION_OK)
+       pg_fatal("Connection to database failed: %s",
+                PQerrorMessage(copyConn));
+
+   return copyConn;
+}
+
+/*
+ * Test query cancellation routines
+ */
+static void
+test_cancel(PGconn *conn)
+{
+   PGcancel   *cancel;
+   PGconn     *monitorConn;
+   char        errorbuf[256];
+
+   fprintf(stderr, "test cancellations... ");
+
+   if (PQsetnonblocking(conn, 1) != 0)
+       pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+   /*
+    * Make a separate connection to the database to monitor the query on the
+    * main connection.
+    */
+   monitorConn = copy_connection(conn);
+   Assert(PQstatus(monitorConn) == CONNECTION_OK);
+
+   /* test PQcancel */
+   send_cancellable_query(conn, monitorConn);
+   cancel = PQgetCancel(conn);
+   if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+       pg_fatal("failed to run PQcancel: %s", errorbuf);
+   confirm_query_canceled(conn);
+
+   /* PGcancel object can be reused for the next query */
+   send_cancellable_query(conn, monitorConn);
+   if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+       pg_fatal("failed to run PQcancel: %s", errorbuf);
+   confirm_query_canceled(conn);
+
+   PQfreeCancel(cancel);
+
+   /* test PQrequestCancel */
+   send_cancellable_query(conn, monitorConn);
+   if (!PQrequestCancel(conn))
+       pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
+   confirm_query_canceled(conn);
+
+   fprintf(stderr, "ok\n");
+}
+
 static void
 test_disallowed_in_pipeline(PGconn *conn)
 {
@@ -1789,6 +1957,7 @@ usage(const char *progname)
 static void
 print_test_list(void)
 {
+   printf("cancel\n");
    printf("disallowed_in_pipeline\n");
    printf("multi_pipelines\n");
    printf("nosync\n");
@@ -1890,7 +2059,9 @@ main(int argc, char **argv)
                        PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
    }
 
-   if (strcmp(testname, "disallowed_in_pipeline") == 0)
+   if (strcmp(testname, "cancel") == 0)
+       test_cancel(conn);
+   else if (strcmp(testname, "disallowed_in_pipeline") == 0)
        test_disallowed_in_pipeline(conn);
    else if (strcmp(testname, "multi_pipelines") == 0)
        test_multi_pipelines(conn);