exit(1);
}
+/*
+ * The following few functions are wrapped in macros to make the reported line
+ * number in an error match the line number of the invocation.
+ */
+
/*
* Print an error to stderr and terminate the program.
*/
{
va_list args;
-
fflush(stdout);
fprintf(stderr, "\n%s:%d: ", progname, line);
exit(1);
}
+/*
+ * Check that the query on the given connection got canceled.
+ */
+#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
+static void
+confirm_query_canceled_impl(int line, PGconn *conn)
+{
+ PGresult *res = NULL;
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal_impl(line, "PQgetResult returned null: %s",
+ PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+ pg_fatal_impl(line, "query did not fail when it was expected");
+ if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
+ pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
+ PQerrorMessage(conn));
+ PQclear(res);
+
+ while (PQisBusy(conn))
+ PQconsumeInput(conn);
+}
+
+#define send_cancellable_query(conn, monitorConn) \
+ send_cancellable_query_impl(__LINE__, conn, monitorConn)
+static void
+send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
+{
+ const char *env_wait;
+ const Oid paramTypes[1] = {INT4OID};
+ int procpid = PQbackendPID(conn);
+
+ env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
+ if (env_wait == NULL)
+ env_wait = "180";
+
+ if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
+ &env_wait, NULL, NULL, 0) != 1)
+ pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
+
+ /*
+ * Wait until the query is actually running. Otherwise sending a
+ * cancellation request might not cancel the query due to race conditions.
+ */
+ while (true)
+ {
+ char *value;
+ PGresult *res;
+ const char *paramValues[1];
+ char pidval[16];
+
+ snprintf(pidval, 16, "%d", procpid);
+ paramValues[0] = pidval;
+
+ res = PQexecParams(monitorConn,
+ "SELECT count(*) FROM pg_stat_activity WHERE "
+ "pid = $1 AND state = 'active'",
+ 1, NULL, paramValues, NULL, NULL, 1);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
+ if (PQntuples(res) != 1)
+ pg_fatal("unexpected number of rows received: %d", PQntuples(res));
+ if (PQnfields(res) != 1)
+ pg_fatal("unexpected number of columns received: %d", PQnfields(res));
+ value = PQgetvalue(res, 0, 0);
+ if (*value != '0')
+ {
+ PQclear(res);
+ break;
+ }
+ PQclear(res);
+
+ /* wait 10ms before polling again */
+ pg_usleep(10000);
+ }
+}
+
+/*
+ * Create a new connection with the same conninfo as the given one.
+ */
+static PGconn *
+copy_connection(PGconn *conn)
+{
+ PGconn *copyConn;
+ PQconninfoOption *opts = PQconninfo(conn);
+ const char **keywords;
+ const char **vals;
+ int nopts = 1;
+ int i = 0;
+
+ for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
+ nopts++;
+
+ keywords = pg_malloc(sizeof(char *) * nopts);
+ vals = pg_malloc(sizeof(char *) * nopts);
+
+ for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
+ {
+ if (opt->val)
+ {
+ keywords[i] = opt->keyword;
+ vals[i] = opt->val;
+ i++;
+ }
+ }
+ keywords[i] = vals[i] = NULL;
+
+ copyConn = PQconnectdbParams(keywords, vals, false);
+
+ if (PQstatus(copyConn) != CONNECTION_OK)
+ pg_fatal("Connection to database failed: %s",
+ PQerrorMessage(copyConn));
+
+ return copyConn;
+}
+
+/*
+ * Test query cancellation routines
+ */
+static void
+test_cancel(PGconn *conn)
+{
+ PGcancel *cancel;
+ PGconn *monitorConn;
+ char errorbuf[256];
+
+ fprintf(stderr, "test cancellations... ");
+
+ if (PQsetnonblocking(conn, 1) != 0)
+ pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
+
+ /*
+ * Make a separate connection to the database to monitor the query on the
+ * main connection.
+ */
+ monitorConn = copy_connection(conn);
+ Assert(PQstatus(monitorConn) == CONNECTION_OK);
+
+ /* test PQcancel */
+ send_cancellable_query(conn, monitorConn);
+ cancel = PQgetCancel(conn);
+ if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+ pg_fatal("failed to run PQcancel: %s", errorbuf);
+ confirm_query_canceled(conn);
+
+ /* PGcancel object can be reused for the next query */
+ send_cancellable_query(conn, monitorConn);
+ if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
+ pg_fatal("failed to run PQcancel: %s", errorbuf);
+ confirm_query_canceled(conn);
+
+ PQfreeCancel(cancel);
+
+ /* test PQrequestCancel */
+ send_cancellable_query(conn, monitorConn);
+ if (!PQrequestCancel(conn))
+ pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
+ confirm_query_canceled(conn);
+
+ fprintf(stderr, "ok\n");
+}
+
static void
test_disallowed_in_pipeline(PGconn *conn)
{
static void
print_test_list(void)
{
+ printf("cancel\n");
printf("disallowed_in_pipeline\n");
printf("multi_pipelines\n");
printf("nosync\n");
PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
}
- if (strcmp(testname, "disallowed_in_pipeline") == 0)
+ if (strcmp(testname, "cancel") == 0)
+ test_cancel(conn);
+ else if (strcmp(testname, "disallowed_in_pipeline") == 0)
test_disallowed_in_pipeline(conn);
else if (strcmp(testname, "multi_pipelines") == 0)
test_multi_pipelines(conn);