gettimeofday(&now, NULL);
+ conn->cur->waitCancel = 0;
+
/* state should be C_READY or C_NONE */
switch (conn->cur->state)
{
res = PQgetResult(conn->cur->db);
if (res == NULL)
{
+ conn->cur->waitCancel = 0;
if (conn->cur->tuning)
conn->cur->state = C_READY;
else
return false;
}
+ /* ignore result when waiting for cancel */
+ if (conn->cur->waitCancel)
+ {
+ PQclear(res);
+ return true;
+ }
+
switch (PQresultStatus(res))
{
case PGRES_TUPLES_OK:
}
}
+static void
+remote_wait_for_cancel(ProxyFunction *func)
+{
+ ProxyConnection *conn;
+ ProxyCluster *cluster = func->cur_cluster;
+ int i,
+ pending = 1;
+ struct timeval now;
+
+ /* now loop until all results are arrived */
+ while (pending)
+ {
+ /* allow postgres to cancel processing */
+ CHECK_FOR_INTERRUPTS();
+
+ /* wait for events */
+ if (poll_conns(func, cluster) == 0)
+ continue;
+
+ /* recheck */
+ pending = 0;
+ gettimeofday(&now, NULL);
+ for (i = 0; i < cluster->active_count; i++)
+ {
+ conn = cluster->active_list[i];
+ if (!conn->run_tag)
+ continue;
+
+ if (conn->cur->state != C_DONE)
+ pending++;
+ check_timeouts(func, cluster, conn, now.tv_sec);
+ }
+ }
+
+ /* review results, calculate total */
+ for (i = 0; i < cluster->active_count; i++)
+ {
+ conn = cluster->active_list[i];
+
+ if (!conn->run_tag)
+ continue;
+
+ if (conn->cur->state != C_DONE)
+ plproxy_error(func, "Unfinished connection: %d", conn->cur->state);
+ if (conn->res != NULL)
+ {
+ PQclear(conn->res);
+ conn->res = NULL;
+ }
+ }
+}
+
static void
remote_cancel(ProxyFunction *func)
{
for (i = 0; i < cluster->active_count; i++)
{
conn = cluster->active_list[i];
- if (conn->cur->state == C_NONE ||
- conn->cur->state == C_READY ||
- conn->cur->state == C_DONE)
- continue;
-
- cancel = PQgetCancel(conn->cur->db);
- if (cancel == NULL)
+ switch (conn->cur->state)
{
- elog(NOTICE, "Invalid connection!");
- continue;
+ case C_NONE:
+ case C_READY:
+ case C_DONE:
+ break;
+ case C_QUERY_WRITE:
+ case C_CONNECT_READ:
+ case C_CONNECT_WRITE:
+ plproxy_disconnect(conn->cur);
+ break;
+ case C_QUERY_READ:
+ cancel = PQgetCancel(conn->cur->db);
+ if (cancel == NULL)
+ {
+ elog(NOTICE, "Invalid connection!");
+ continue;
+ }
+ ret = PQcancel(cancel, errbuf, sizeof(errbuf));
+ PQfreeCancel(cancel);
+ if (ret == 0)
+ elog(NOTICE, "Cancel query failed!");
+ else
+ conn->cur->waitCancel = 1;
+ break;
}
- ret = PQcancel(cancel, errbuf, sizeof(errbuf));
- PQfreeCancel(cancel);
- if (ret == 0)
- elog(NOTICE, "Cancel query failed!");
}
+
+ remote_wait_for_cancel(func);
}
/*
cur->query_time = 0;
cur->same_ver = 0;
cur->tuning = 0;
+ cur->waitCancel = 0;
}
/* Select partitions and execute query on them */
--- /dev/null
+-- test canceling
+\c test_part0
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part1
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part2
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part3
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c regression
+create function rsleep(val int4, out res int4) returns setof int4 as $$
+ cluster 'testcluster';
+ run on all;
+$$ language plproxy;
+set statement_timeout = '1000';
+select * from rsleep(10);
+ERROR: canceling statement due to statement timeout
+-- test if works later
+select * from rsleep(0);
+ res
+-----
+ 1
+ 1
+ 1
+ 1
+(4 rows)
+
--- /dev/null
+
+-- test canceling
+
+\c test_part0
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part1
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part2
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part3
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c regression
+create function rsleep(val int4, out res int4) returns setof int4 as $$
+ cluster 'testcluster';
+ run on all;
+$$ language plproxy;
+
+set statement_timeout = '1000';
+select * from rsleep(10);
+
+-- test if works later
+select * from rsleep(0);
+