Fix query cache bug in streaming replication mode.
authorTatsuo Ishii <ishii@postgresql.org>
Wed, 4 Dec 2024 12:38:23 +0000 (21:38 +0900)
committerTatsuo Ishii <ishii@postgresql.org>
Thu, 5 Dec 2024 04:37:03 +0000 (13:37 +0900)
When query cache is enabled and an execute message is sent from
frontend, pgpool injects query cache data into backend message buffer
if query cache data is available. inject_cached_message() is
responsible for the task. But it had an oversight if the message
stream from frontend includes more than one sets of bind or describe
message before a sync message. It tried to determine the frontend
message end by finding a bind complete or a row description message
from backend. But in the case, it is possible that these messages do
not indicate the message stream end because there are one more bind
complete or row description message. As a result the cached message is
inserted at inappropriate positron and pgpool mistakenly raised "kind
mismatch" error.

This commit changes the algorithm to detect the message stream end:
compare the number of messages from backend with the pending message
queue length. When a message is read from backend, the counter for the
number of message is counted up if the message is one of parse
complete, bind complete, close complete, command compete, portal
suspended or row description. For other message type the counter is
not counted up. If the counter reaches to the pending message queue
length, we are at the end of message stream and inject the cahced
messages.

Test cases for 006.memqcache are added.

Backpatch-through: v4.2.

src/query_cache/pool_memqcache.c
src/test/regression/tests/006.memqcache/expected.1 [new file with mode: 0644]
src/test/regression/tests/006.memqcache/expected.2 [new file with mode: 0644]
src/test/regression/tests/006.memqcache/expected.3 [new file with mode: 0644]
src/test/regression/tests/006.memqcache/query_cache_bug1.data [new file with mode: 0644]
src/test/regression/tests/006.memqcache/query_cache_bug2.data [new file with mode: 0644]
src/test/regression/tests/006.memqcache/query_cache_bug3.data [new file with mode: 0644]
src/test/regression/tests/006.memqcache/test.sh

index 7f824fe2277fd2487964479c08e6e48460e5c811..35c0f56edf1ab4b711067017580f9faf7f469a25 100644 (file)
@@ -4600,21 +4600,24 @@ inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen)
        int                     i = 0;
        bool            is_prepared_stmt = false;
        POOL_SESSION_CONTEXT *session_context;
-       POOL_QUERY_CONTEXT *query_context;
        POOL_PENDING_MESSAGE *msg;
+       int                     num_msgs;
+       int                     msg_cnt = 0;
 
        session_context = pool_get_session_context(false);
-       query_context = session_context->query_context;
-       msg = pool_pending_message_find_lastest_by_query_context(query_context);
+       msg = pool_pending_message_head_message();
+       num_msgs = list_length(session_context->pending_messages);
 
        if (msg)
        {
                /*
-                * If pending message found, we should extract target backend from it
+                * If pending message found, we should extract data from the target
+                * backend.
                 */
                int                     backend_id;
 
                backend_id = pool_pending_message_get_target_backend_id(msg);
+               pool_pending_message_free_pending_message(msg);
                backend = CONNECTION(session_context->backend, backend_id);
                timeout = -1;
        }
@@ -4645,15 +4648,50 @@ inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen)
                        pool_set_timeout(-1);
                }
 
+               /* read one message from backend */
                pool_read(backend, &kind, 1);
-               ereport(DEBUG1,
-                               (errmsg("inject_cached_message: push message kind: '%c'", kind)));
-               if (msg &&
-                       ((kind == 'T' && msg->type == POOL_DESCRIBE) ||
-                        (kind == '2' && msg->type == POOL_BIND)))
+
+               /*
+                * Count up number of received messages to compare with the number of
+                * pending messages
+                */
+               switch(kind)
+               {
+                       case '1':       /* parse complete */
+                       case '2':       /* bind complete */
+                       case '3':       /* close complete */
+                       case 'C':       /* command complete */
+                       case 's':       /* portal suspended */
+                       case 'T':       /* row description */
+                               msg_cnt++;      /* count up number of messages */
+                               elog(DEBUG1, "count up message %c msg_cnt: %d", kind, msg_cnt);
+                               break;
+                       case 'E':       /* ErrorResponse */
+                               /*
+                                * If we receive ErrorResponse, it is likely that the last
+                                * Execute caused an error and we can stop reading messsages
+                                * from backend.
+                                */
+                               timeout = 0;
+                               break;
+                       default:
+                               /* we do not count other messages */
+                               break;
+               }
+
+               /*
+                * If msg count is greater than or equal to the number of pending
+                * messages, it is likely all necessary backend messages have been
+                * already seen.
+                */
+               if (msg_cnt >= num_msgs)
                {
-                       /* Pending message seen. Now it is likely to end of pending data */
+                       /*
+                        * Set timeout to 0 so that we do not need to wait for responses
+                        * from backend in vain.
+                        */
                        timeout = 0;
+                       elog(DEBUG1, "num_msgs: %d msg_cnt: %d", num_msgs, msg_cnt);
                }
                pool_push(backend, &kind, sizeof(kind));
                pool_read(backend, &len, sizeof(len));
diff --git a/src/test/regression/tests/006.memqcache/expected.1 b/src/test/regression/tests/006.memqcache/expected.1
new file mode 100644 (file)
index 0000000..0475eca
--- /dev/null
@@ -0,0 +1,30 @@
+FE=> Parse(stmt="S0", query="SELECT 1")
+FE=> Bind(stmt="S0", portal="P0")
+FE=> Describe(portal="P0")
+FE=> Execute(portal="P0")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="S1", query="SELECT 2")
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Bind(stmt="S0", portal="P2")
+FE=> Describe(portal="P2")
+FE=> Execute(portal="P2")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE ReadyForQuery(I)
+FE=> Terminate
diff --git a/src/test/regression/tests/006.memqcache/expected.2 b/src/test/regression/tests/006.memqcache/expected.2
new file mode 100644 (file)
index 0000000..11dbdad
--- /dev/null
@@ -0,0 +1,25 @@
+FE=> Parse(stmt="S0", query="SELECT 1")
+FE=> Bind(stmt="S0", portal="P0")
+FE=> Describe(portal="P0")
+FE=> Execute(portal="P0")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="S1", query="SELECT setseed(10)")
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Bind(stmt="S0", portal="P2")
+FE=> Describe(portal="P2")
+FE=> Execute(portal="P2")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE ErrorResponse(S ERROR V ERROR C 22023 M setseed parameter 10 is out of allowed range [-1,1]
+<= BE ReadyForQuery(I)
+FE=> Terminate
diff --git a/src/test/regression/tests/006.memqcache/expected.3 b/src/test/regression/tests/006.memqcache/expected.3
new file mode 100644 (file)
index 0000000..05abf78
--- /dev/null
@@ -0,0 +1,32 @@
+FE=> Parse(stmt="S0", query="SELECT * FROM (VALUES(1),(2))")
+FE=> Bind(stmt="S0", portal="P0")
+FE=> Describe(portal="P0")
+FE=> Execute(portal="P0")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="S1", query="SELECT * FROM (VALUES(1),(2),(3))")
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Bind(stmt="S0", portal="P2")
+FE=> Describe(portal="P2")
+FE=> Execute(portal="P2")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE PortalSuspended
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE ReadyForQuery(I)
+FE=> Terminate
diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug1.data b/src/test/regression/tests/006.memqcache/query_cache_bug1.data
new file mode 100644 (file)
index 0000000..dfc8428
--- /dev/null
@@ -0,0 +1,21 @@
+# "packet kind of backend 0 ['T'] does not match with main/majority nodes packet kind ['Z']"
+'P'    "S0"    "SELECT 1"      0
+'B'    "P0"    "S0"    0       0       0   
+'D'    'P'     "P0"
+'E'    "P0"    0   
+'S'
+'Y'
+
+'P'    "S1"    "SELECT 2"      0
+'B'    "P1"    "S1"    0       0       0   
+'D'    'P'     "P1"
+'E'    "P1"    0   
+
+'B'    "P2"    "S0"    0       0       0   
+'D'    'P'     "P2"
+'E'    "P2"    0   
+
+'S'
+'Y'
+
+'X'
diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug2.data b/src/test/regression/tests/006.memqcache/query_cache_bug2.data
new file mode 100644 (file)
index 0000000..e9e14ee
--- /dev/null
@@ -0,0 +1,23 @@
+# test case including error response
+
+'P'    "S0"    "SELECT 1"      0
+'B'    "P0"    "S0"    0       0       0   
+'D'    'P'     "P0"
+'E'    "P0"    0   
+'S'
+'Y'
+
+# setseed parameter out of range error
+'P'    "S1"    "SELECT setseed(10)"    0
+'B'    "P1"    "S1"    0       0       0   
+'D'    'P'     "P1"
+'E'    "P1"    0   
+
+'B'    "P2"    "S0"    0       0       0   
+'D'    'P'     "P2"
+'E'    "P2"    0   
+
+'S'
+'Y'
+
+'X'
diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug3.data b/src/test/regression/tests/006.memqcache/query_cache_bug3.data
new file mode 100644 (file)
index 0000000..790f21a
--- /dev/null
@@ -0,0 +1,21 @@
+# Portal suspended test
+
+'P'    "S0"    "SELECT * FROM (VALUES(1),(2))" 0
+'B'    "P0"    "S0"    0       0       0   
+'D'    'P'     "P0"
+'E'    "P0"    0   
+'S'
+'Y'
+
+'P'    "S1"    "SELECT * FROM (VALUES(1),(2),(3))"     0
+'B'    "P1"    "S1"    0       0       0   
+'D'    'P'     "P1"
+'E'    "P1"    1
+
+'B'    "P2"    "S0"    0       0       0   
+'D'    'P'     "P2"
+'E'    "P2"    0
+'S'
+'Y'
+
+'X'
index fa9f4b15e30522931f233b21953781b6d7821516..fa8dd556714e1630c8af508115826e4fe9a428c6 100755 (executable)
@@ -519,4 +519,56 @@ EOF
        cd ..
 done
 
+#
+# Test for extended query protocol coner cases in streaming replication mode.
+# These tests are basically for a sequence of extended queries:
+# 1. execute a SELECT and create query cache entry.
+# 2. sync.
+# 3. execute another a SELECT.
+# 4. execute bind and execute to use the query cache created at #1.
+# 5. sync.
+
+rm -fr $TESTDIR
+mkdir $TESTDIR
+cd $TESTDIR
+
+# create test environment
+echo -n "creating test environment..."
+$PGPOOL_SETUP -m s -n 2 || exit 1
+echo "done."
+
+echo "memory_cache_enabled = on" >> etc/pgpool.conf
+cd ..
+
+for i in 1 2 3
+do
+    #
+    # case 1: failed with kind mismatch error at #5.
+    # "packet kind of backend 0 ['T'] does not match with main/majority nodes packet kind ['Z']"
+    #
+    # case 2: step #4 includes error (hung).
+    #
+    # case 3: step #4 includes PortalSuspended (hung).
+    cd $TESTDIR
+    ./startall
+    wait_for_pgpool_startup
+    timeout 1 $PGPROTO -d test -f ../query_cache_bug$i.data |& del_details_from_error > result
+    if [ $? != 0 ];then
+       # timeout happened or pgproto returned non 0 status
+       echo "test failed in test case #2 (timeout)"
+       err=true
+       ./shutdownall
+       exit 1
+    fi
+    ./shutdownall
+    cd ..
+    diff -c expected.$i $TESTDIR/result > $log
+    if [ $? != 0 ];then
+       echo "test failed in test case $i"
+       cat $log
+       rm $log
+       exit 1
+    fi
+done
+
 exit 0