From 4dd7371c2c924daa3544732fc5342d2cf24c8956 Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Wed, 4 Dec 2024 21:38:23 +0900 Subject: [PATCH] Fix query cache bug in streaming replication mode. When query cache is enabled and an execute message is sent from frontend, pgpool injects query cache data into backend message buffer if query cache data is available. inject_cached_message() is responsible for the task. But it had an oversight if the message stream from frontend includes more than one sets of bind or describe message before a sync message. It tried to determine the frontend message end by finding a bind complete or a row description message from backend. But in the case, it is possible that these messages do not indicate the message stream end because there are one more bind complete or row description message. As a result the cached message is inserted at inappropriate positron and pgpool mistakenly raised "kind mismatch" error. This commit changes the algorithm to detect the message stream end: compare the number of messages from backend with the pending message queue length. When a message is read from backend, the counter for the number of message is counted up if the message is one of parse complete, bind complete, close complete, command compete, portal suspended or row description. For other message type the counter is not counted up. If the counter reaches to the pending message queue length, we are at the end of message stream and inject the cahced messages. Test cases for 006.memqcache are added. Backpatch-through: v4.2. --- src/query_cache/pool_memqcache.c | 58 +++++++++++++++---- .../regression/tests/006.memqcache/expected.1 | 30 ++++++++++ .../regression/tests/006.memqcache/expected.2 | 25 ++++++++ .../regression/tests/006.memqcache/expected.3 | 32 ++++++++++ .../tests/006.memqcache/query_cache_bug1.data | 21 +++++++ .../tests/006.memqcache/query_cache_bug2.data | 23 ++++++++ .../tests/006.memqcache/query_cache_bug3.data | 21 +++++++ .../regression/tests/006.memqcache/test.sh | 52 +++++++++++++++++ 8 files changed, 252 insertions(+), 10 deletions(-) create mode 100644 src/test/regression/tests/006.memqcache/expected.1 create mode 100644 src/test/regression/tests/006.memqcache/expected.2 create mode 100644 src/test/regression/tests/006.memqcache/expected.3 create mode 100644 src/test/regression/tests/006.memqcache/query_cache_bug1.data create mode 100644 src/test/regression/tests/006.memqcache/query_cache_bug2.data create mode 100644 src/test/regression/tests/006.memqcache/query_cache_bug3.data diff --git a/src/query_cache/pool_memqcache.c b/src/query_cache/pool_memqcache.c index 7f824fe22..35c0f56ed 100644 --- a/src/query_cache/pool_memqcache.c +++ b/src/query_cache/pool_memqcache.c @@ -4600,21 +4600,24 @@ inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen) int i = 0; bool is_prepared_stmt = false; POOL_SESSION_CONTEXT *session_context; - POOL_QUERY_CONTEXT *query_context; POOL_PENDING_MESSAGE *msg; + int num_msgs; + int msg_cnt = 0; session_context = pool_get_session_context(false); - query_context = session_context->query_context; - msg = pool_pending_message_find_lastest_by_query_context(query_context); + msg = pool_pending_message_head_message(); + num_msgs = list_length(session_context->pending_messages); if (msg) { /* - * If pending message found, we should extract target backend from it + * If pending message found, we should extract data from the target + * backend. */ int backend_id; backend_id = pool_pending_message_get_target_backend_id(msg); + pool_pending_message_free_pending_message(msg); backend = CONNECTION(session_context->backend, backend_id); timeout = -1; } @@ -4645,15 +4648,50 @@ inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen) pool_set_timeout(-1); } + /* read one message from backend */ pool_read(backend, &kind, 1); - ereport(DEBUG1, - (errmsg("inject_cached_message: push message kind: '%c'", kind))); - if (msg && - ((kind == 'T' && msg->type == POOL_DESCRIBE) || - (kind == '2' && msg->type == POOL_BIND))) + + /* + * Count up number of received messages to compare with the number of + * pending messages + */ + switch(kind) + { + case '1': /* parse complete */ + case '2': /* bind complete */ + case '3': /* close complete */ + case 'C': /* command complete */ + case 's': /* portal suspended */ + case 'T': /* row description */ + msg_cnt++; /* count up number of messages */ + elog(DEBUG1, "count up message %c msg_cnt: %d", kind, msg_cnt); + break; + case 'E': /* ErrorResponse */ + /* + * If we receive ErrorResponse, it is likely that the last + * Execute caused an error and we can stop reading messsages + * from backend. + */ + timeout = 0; + break; + default: + /* we do not count other messages */ + break; + } + + /* + * If msg count is greater than or equal to the number of pending + * messages, it is likely all necessary backend messages have been + * already seen. + */ + if (msg_cnt >= num_msgs) { - /* Pending message seen. Now it is likely to end of pending data */ + /* + * Set timeout to 0 so that we do not need to wait for responses + * from backend in vain. + */ timeout = 0; + elog(DEBUG1, "num_msgs: %d msg_cnt: %d", num_msgs, msg_cnt); } pool_push(backend, &kind, sizeof(kind)); pool_read(backend, &len, sizeof(len)); diff --git a/src/test/regression/tests/006.memqcache/expected.1 b/src/test/regression/tests/006.memqcache/expected.1 new file mode 100644 index 000000000..0475eca53 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/expected.1 @@ -0,0 +1,30 @@ +FE=> Parse(stmt="S0", query="SELECT 1") +FE=> Bind(stmt="S0", portal="P0") +FE=> Describe(portal="P0") +FE=> Execute(portal="P0") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE ReadyForQuery(I) +FE=> Parse(stmt="S1", query="SELECT 2") +FE=> Bind(stmt="S1", portal="P1") +FE=> Describe(portal="P1") +FE=> Execute(portal="P1") +FE=> Bind(stmt="S0", portal="P2") +FE=> Describe(portal="P2") +FE=> Execute(portal="P2") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE ReadyForQuery(I) +FE=> Terminate diff --git a/src/test/regression/tests/006.memqcache/expected.2 b/src/test/regression/tests/006.memqcache/expected.2 new file mode 100644 index 000000000..11dbdadfe --- /dev/null +++ b/src/test/regression/tests/006.memqcache/expected.2 @@ -0,0 +1,25 @@ +FE=> Parse(stmt="S0", query="SELECT 1") +FE=> Bind(stmt="S0", portal="P0") +FE=> Describe(portal="P0") +FE=> Execute(portal="P0") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE ReadyForQuery(I) +FE=> Parse(stmt="S1", query="SELECT setseed(10)") +FE=> Bind(stmt="S1", portal="P1") +FE=> Describe(portal="P1") +FE=> Execute(portal="P1") +FE=> Bind(stmt="S0", portal="P2") +FE=> Describe(portal="P2") +FE=> Execute(portal="P2") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE ErrorResponse(S ERROR V ERROR C 22023 M setseed parameter 10 is out of allowed range [-1,1] +<= BE ReadyForQuery(I) +FE=> Terminate diff --git a/src/test/regression/tests/006.memqcache/expected.3 b/src/test/regression/tests/006.memqcache/expected.3 new file mode 100644 index 000000000..05abf7833 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/expected.3 @@ -0,0 +1,32 @@ +FE=> Parse(stmt="S0", query="SELECT * FROM (VALUES(1),(2))") +FE=> Bind(stmt="S0", portal="P0") +FE=> Describe(portal="P0") +FE=> Execute(portal="P0") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE DataRow +<= BE CommandComplete(SELECT 2) +<= BE ReadyForQuery(I) +FE=> Parse(stmt="S1", query="SELECT * FROM (VALUES(1),(2),(3))") +FE=> Bind(stmt="S1", portal="P1") +FE=> Describe(portal="P1") +FE=> Execute(portal="P1") +FE=> Bind(stmt="S0", portal="P2") +FE=> Describe(portal="P2") +FE=> Execute(portal="P2") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE PortalSuspended +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE DataRow +<= BE CommandComplete(SELECT 2) +<= BE ReadyForQuery(I) +FE=> Terminate diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug1.data b/src/test/regression/tests/006.memqcache/query_cache_bug1.data new file mode 100644 index 000000000..dfc842882 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/query_cache_bug1.data @@ -0,0 +1,21 @@ +# "packet kind of backend 0 ['T'] does not match with main/majority nodes packet kind ['Z']" +'P' "S0" "SELECT 1" 0 +'B' "P0" "S0" 0 0 0 +'D' 'P' "P0" +'E' "P0" 0 +'S' +'Y' + +'P' "S1" "SELECT 2" 0 +'B' "P1" "S1" 0 0 0 +'D' 'P' "P1" +'E' "P1" 0 + +'B' "P2" "S0" 0 0 0 +'D' 'P' "P2" +'E' "P2" 0 + +'S' +'Y' + +'X' diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug2.data b/src/test/regression/tests/006.memqcache/query_cache_bug2.data new file mode 100644 index 000000000..e9e14ee65 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/query_cache_bug2.data @@ -0,0 +1,23 @@ +# test case including error response + +'P' "S0" "SELECT 1" 0 +'B' "P0" "S0" 0 0 0 +'D' 'P' "P0" +'E' "P0" 0 +'S' +'Y' + +# setseed parameter out of range error +'P' "S1" "SELECT setseed(10)" 0 +'B' "P1" "S1" 0 0 0 +'D' 'P' "P1" +'E' "P1" 0 + +'B' "P2" "S0" 0 0 0 +'D' 'P' "P2" +'E' "P2" 0 + +'S' +'Y' + +'X' diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug3.data b/src/test/regression/tests/006.memqcache/query_cache_bug3.data new file mode 100644 index 000000000..790f21a3d --- /dev/null +++ b/src/test/regression/tests/006.memqcache/query_cache_bug3.data @@ -0,0 +1,21 @@ +# Portal suspended test + +'P' "S0" "SELECT * FROM (VALUES(1),(2))" 0 +'B' "P0" "S0" 0 0 0 +'D' 'P' "P0" +'E' "P0" 0 +'S' +'Y' + +'P' "S1" "SELECT * FROM (VALUES(1),(2),(3))" 0 +'B' "P1" "S1" 0 0 0 +'D' 'P' "P1" +'E' "P1" 1 + +'B' "P2" "S0" 0 0 0 +'D' 'P' "P2" +'E' "P2" 0 +'S' +'Y' + +'X' diff --git a/src/test/regression/tests/006.memqcache/test.sh b/src/test/regression/tests/006.memqcache/test.sh index fa9f4b15e..fa8dd5567 100755 --- a/src/test/regression/tests/006.memqcache/test.sh +++ b/src/test/regression/tests/006.memqcache/test.sh @@ -519,4 +519,56 @@ EOF cd .. done +# +# Test for extended query protocol coner cases in streaming replication mode. +# These tests are basically for a sequence of extended queries: +# 1. execute a SELECT and create query cache entry. +# 2. sync. +# 3. execute another a SELECT. +# 4. execute bind and execute to use the query cache created at #1. +# 5. sync. + +rm -fr $TESTDIR +mkdir $TESTDIR +cd $TESTDIR + +# create test environment +echo -n "creating test environment..." +$PGPOOL_SETUP -m s -n 2 || exit 1 +echo "done." + +echo "memory_cache_enabled = on" >> etc/pgpool.conf +cd .. + +for i in 1 2 3 +do + # + # case 1: failed with kind mismatch error at #5. + # "packet kind of backend 0 ['T'] does not match with main/majority nodes packet kind ['Z']" + # + # case 2: step #4 includes error (hung). + # + # case 3: step #4 includes PortalSuspended (hung). + cd $TESTDIR + ./startall + wait_for_pgpool_startup + timeout 1 $PGPROTO -d test -f ../query_cache_bug$i.data |& del_details_from_error > result + if [ $? != 0 ];then + # timeout happened or pgproto returned non 0 status + echo "test failed in test case #2 (timeout)" + err=true + ./shutdownall + exit 1 + fi + ./shutdownall + cd .. + diff -c expected.$i $TESTDIR/result > $log + if [ $? != 0 ];then + echo "test failed in test case $i" + cat $log + rm $log + exit 1 + fi +done + exit 0 -- 2.39.5