After sending cancel req, wait for remote error
authorMarko Kreen <markokr@gmail.com>
Thu, 11 Oct 2012 09:06:37 +0000 (12:06 +0300)
committerMarko Kreen <markokr@gmail.com>
Fri, 12 Oct 2012 07:24:48 +0000 (10:24 +0300)
Previously, as soon as cancel requests were send,
plproxy re-throwed the error, without waiting for
reaction from backend.  Such behaviour creates
2 problems:

- If plproxy backend is closed immediately, the bouncer will
  see plproxy close before cancel from backend, thus seeing
  mid-tx close, thus dropping the connection.

- If new query comes in to plproxy backend, plproxy itself
  will see dirty connection, closing it, thus also causing
  close of server connection in bouncer.

In both cases it can cause server connection drop in pooler.
New behaviour of waiting query result should fix it.

Makefile
src/execute.c
src/plproxy.h
test/expected/plproxy_cancel.out [new file with mode: 0644]
test/sql/plproxy_cancel.sql [new file with mode: 0644]

index 7d890975737f2f8ae45afb8de9a0a2635533f283..cdef2c59d3ed68a7ffe83b192ac743b5be332463 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -36,7 +36,8 @@ DISTNAME = $(EXTENSION)-$(DISTVERSION)
 # regression testing setup
 REGRESS = plproxy_init plproxy_test plproxy_select plproxy_many \
      plproxy_errors plproxy_clustermap plproxy_dynamic_record \
-     plproxy_encoding plproxy_split plproxy_target plproxy_table
+     plproxy_encoding plproxy_split plproxy_target plproxy_table \
+     plproxy_cancel
 REGRESS_OPTS = --dbname=regression --inputdir=test
 # pg9.1 ignores --dbname
 override CONTRIB_TESTDB := regression
index 926baa3be7afd4863a5327af4e171b9c9b3e243e..bfd4b0f7892a9b3fe62adfd1ac1557cd6744f1e7 100644 (file)
@@ -401,6 +401,8 @@ prepare_conn(ProxyFunction *func, ProxyConnection *conn)
 
        gettimeofday(&now, NULL);
 
+       conn->cur->waitCancel = 0;
+
        /* state should be C_READY or C_NONE */
        switch (conn->cur->state)
        {
@@ -456,6 +458,7 @@ another_result(ProxyFunction *func, ProxyConnection *conn)
        res = PQgetResult(conn->cur->db);
        if (res == NULL)
        {
+               conn->cur->waitCancel = 0;
                if (conn->cur->tuning)
                        conn->cur->state = C_READY;
                else
@@ -463,6 +466,13 @@ another_result(ProxyFunction *func, ProxyConnection *conn)
                return false;
        }
 
+       /* ignore result when waiting for cancel */
+       if (conn->cur->waitCancel)
+       {
+               PQclear(res);
+               return true;
+       }
+
        switch (PQresultStatus(res))
        {
                case PGRES_TUPLES_OK:
@@ -778,6 +788,58 @@ remote_execute(ProxyFunction *func)
        }
 }
 
+static void
+remote_wait_for_cancel(ProxyFunction *func)
+{
+       ProxyConnection *conn;
+       ProxyCluster *cluster = func->cur_cluster;
+       int                     i,
+                               pending = 1;
+       struct timeval now;
+
+       /* now loop until all results are arrived */
+       while (pending)
+       {
+               /* allow postgres to cancel processing */
+               CHECK_FOR_INTERRUPTS();
+
+               /* wait for events */
+               if (poll_conns(func, cluster) == 0)
+                       continue;
+
+               /* recheck */
+               pending = 0;
+               gettimeofday(&now, NULL);
+               for (i = 0; i < cluster->active_count; i++)
+               {
+                       conn = cluster->active_list[i];
+                       if (!conn->run_tag)
+                               continue;
+
+                       if (conn->cur->state != C_DONE)
+                               pending++;
+                       check_timeouts(func, cluster, conn, now.tv_sec);
+               }
+       }
+
+       /* review results, calculate total */
+       for (i = 0; i < cluster->active_count; i++)
+       {
+               conn = cluster->active_list[i];
+
+               if (!conn->run_tag)
+                       continue;
+
+               if (conn->cur->state != C_DONE)
+                       plproxy_error(func, "Unfinished connection: %d", conn->cur->state);
+               if (conn->res != NULL)
+               {
+                       PQclear(conn->res);
+                       conn->res = NULL;
+               }
+       }
+}
+
 static void
 remote_cancel(ProxyFunction *func)
 {
@@ -794,22 +856,35 @@ remote_cancel(ProxyFunction *func)
        for (i = 0; i < cluster->active_count; i++)
        {
                conn = cluster->active_list[i];
-               if (conn->cur->state == C_NONE ||
-                       conn->cur->state == C_READY ||
-                       conn->cur->state == C_DONE)
-                       continue;
-
-               cancel = PQgetCancel(conn->cur->db);
-               if (cancel == NULL)
+               switch (conn->cur->state)
                {
-                       elog(NOTICE, "Invalid connection!");
-                       continue;
+                       case C_NONE:
+                       case C_READY:
+                       case C_DONE:
+                               break;
+                       case C_QUERY_WRITE:
+                       case C_CONNECT_READ:
+                       case C_CONNECT_WRITE:
+                               plproxy_disconnect(conn->cur);
+                               break;
+                       case C_QUERY_READ:
+                               cancel = PQgetCancel(conn->cur->db);
+                               if (cancel == NULL)
+                               {
+                                       elog(NOTICE, "Invalid connection!");
+                                       continue;
+                               }
+                               ret = PQcancel(cancel, errbuf, sizeof(errbuf));
+                               PQfreeCancel(cancel);
+                               if (ret == 0)
+                                       elog(NOTICE, "Cancel query failed!");
+                               else
+                                       conn->cur->waitCancel = 1;
+                               break;
                }
-               ret = PQcancel(cancel, errbuf, sizeof(errbuf));
-               PQfreeCancel(cancel);
-               if (ret == 0)
-                       elog(NOTICE, "Cancel query failed!");
        }
+
+       remote_wait_for_cancel(func);
 }
 
 /*
@@ -1166,6 +1241,7 @@ void plproxy_disconnect(ProxyConnectionState *cur)
        cur->query_time = 0;
        cur->same_ver = 0;
        cur->tuning = 0;
+       cur->waitCancel = 0;
 }
 
 /* Select partitions and execute query on them */
index c262b7230185239765db4e4d8caf816e7bd267bd..da9b9d47528257dd879178221b9380bd8a6f4a5b 100644 (file)
@@ -169,6 +169,7 @@ typedef struct ProxyConnectionState {
        time_t          query_time;             /* When last query was sent */
        bool            same_ver;               /* True if dest backend has same X.Y ver */
        bool            tuning;                 /* True if tuning query is running on conn */
+       bool            waitCancel;             /* True if waiting for answer from cancel */
 } ProxyConnectionState;
 
 /* Single database connection */
diff --git a/test/expected/plproxy_cancel.out b/test/expected/plproxy_cancel.out
new file mode 100644 (file)
index 0000000..605471f
--- /dev/null
@@ -0,0 +1,27 @@
+-- test canceling
+\c test_part0
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part1
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part2
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part3
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c regression
+create function rsleep(val int4, out res int4) returns setof int4 as $$
+    cluster 'testcluster';
+    run on all;
+$$ language plproxy;
+set statement_timeout = '1000';
+select * from rsleep(10);
+ERROR:  canceling statement due to statement timeout
+-- test if works later
+select * from rsleep(0);
+ res 
+-----
+   1
+   1
+   1
+   1
+(4 rows)
+
diff --git a/test/sql/plproxy_cancel.sql b/test/sql/plproxy_cancel.sql
new file mode 100644 (file)
index 0000000..e24fe03
--- /dev/null
@@ -0,0 +1,23 @@
+
+-- test canceling
+
+\c test_part0
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part1
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part2
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c test_part3
+create function rsleep(val int4) returns int4 as $$ begin perform pg_sleep(val); return 1; end; $$ language plpgsql;
+\c regression
+create function rsleep(val int4, out res int4) returns setof int4 as $$
+    cluster 'testcluster';
+    run on all;
+$$ language plproxy;
+
+set statement_timeout = '1000';
+select * from rsleep(10);
+
+-- test if works later
+select * from rsleep(0);
+