diff options
author | Tatsuo Ishii | 2024-12-04 12:38:23 +0000 |
---|---|---|
committer | Tatsuo Ishii | 2024-12-05 04:37:03 +0000 |
commit | 4dd7371c2c924daa3544732fc5342d2cf24c8956 (patch) | |
tree | 4da58f40c5e8bbd625c274e44a7bbac0936bf203 | |
parent | 5c378d0d8b94efed9dc2e2bf63e9482318eb1e06 (diff) |
Fix query cache bug in streaming replication mode.
When query cache is enabled and an execute message is sent from
frontend, pgpool injects query cache data into backend message buffer
if query cache data is available. inject_cached_message() is
responsible for the task. But it had an oversight if the message
stream from frontend includes more than one sets of bind or describe
message before a sync message. It tried to determine the frontend
message end by finding a bind complete or a row description message
from backend. But in the case, it is possible that these messages do
not indicate the message stream end because there are one more bind
complete or row description message. As a result the cached message is
inserted at inappropriate positron and pgpool mistakenly raised "kind
mismatch" error.
This commit changes the algorithm to detect the message stream end:
compare the number of messages from backend with the pending message
queue length. When a message is read from backend, the counter for the
number of message is counted up if the message is one of parse
complete, bind complete, close complete, command compete, portal
suspended or row description. For other message type the counter is
not counted up. If the counter reaches to the pending message queue
length, we are at the end of message stream and inject the cahced
messages.
Test cases for 006.memqcache are added.
Backpatch-through: v4.2.
-rw-r--r-- | src/query_cache/pool_memqcache.c | 58 | ||||
-rw-r--r-- | src/test/regression/tests/006.memqcache/expected.1 | 30 | ||||
-rw-r--r-- | src/test/regression/tests/006.memqcache/expected.2 | 25 | ||||
-rw-r--r-- | src/test/regression/tests/006.memqcache/expected.3 | 32 | ||||
-rw-r--r-- | src/test/regression/tests/006.memqcache/query_cache_bug1.data | 21 | ||||
-rw-r--r-- | src/test/regression/tests/006.memqcache/query_cache_bug2.data | 23 | ||||
-rw-r--r-- | src/test/regression/tests/006.memqcache/query_cache_bug3.data | 21 | ||||
-rwxr-xr-x | src/test/regression/tests/006.memqcache/test.sh | 52 |
8 files changed, 252 insertions, 10 deletions
diff --git a/src/query_cache/pool_memqcache.c b/src/query_cache/pool_memqcache.c index 7f824fe22..35c0f56ed 100644 --- a/src/query_cache/pool_memqcache.c +++ b/src/query_cache/pool_memqcache.c @@ -4600,21 +4600,24 @@ inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen) int i = 0; bool is_prepared_stmt = false; POOL_SESSION_CONTEXT *session_context; - POOL_QUERY_CONTEXT *query_context; POOL_PENDING_MESSAGE *msg; + int num_msgs; + int msg_cnt = 0; session_context = pool_get_session_context(false); - query_context = session_context->query_context; - msg = pool_pending_message_find_lastest_by_query_context(query_context); + msg = pool_pending_message_head_message(); + num_msgs = list_length(session_context->pending_messages); if (msg) { /* - * If pending message found, we should extract target backend from it + * If pending message found, we should extract data from the target + * backend. */ int backend_id; backend_id = pool_pending_message_get_target_backend_id(msg); + pool_pending_message_free_pending_message(msg); backend = CONNECTION(session_context->backend, backend_id); timeout = -1; } @@ -4645,15 +4648,50 @@ inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen) pool_set_timeout(-1); } + /* read one message from backend */ pool_read(backend, &kind, 1); - ereport(DEBUG1, - (errmsg("inject_cached_message: push message kind: '%c'", kind))); - if (msg && - ((kind == 'T' && msg->type == POOL_DESCRIBE) || - (kind == '2' && msg->type == POOL_BIND))) + + /* + * Count up number of received messages to compare with the number of + * pending messages + */ + switch(kind) + { + case '1': /* parse complete */ + case '2': /* bind complete */ + case '3': /* close complete */ + case 'C': /* command complete */ + case 's': /* portal suspended */ + case 'T': /* row description */ + msg_cnt++; /* count up number of messages */ + elog(DEBUG1, "count up message %c msg_cnt: %d", kind, msg_cnt); + break; + case 'E': /* ErrorResponse */ + /* + * If we receive ErrorResponse, it is likely that the last + * Execute caused an error and we can stop reading messsages + * from backend. + */ + timeout = 0; + break; + default: + /* we do not count other messages */ + break; + } + + /* + * If msg count is greater than or equal to the number of pending + * messages, it is likely all necessary backend messages have been + * already seen. + */ + if (msg_cnt >= num_msgs) { - /* Pending message seen. Now it is likely to end of pending data */ + /* + * Set timeout to 0 so that we do not need to wait for responses + * from backend in vain. + */ timeout = 0; + elog(DEBUG1, "num_msgs: %d msg_cnt: %d", num_msgs, msg_cnt); } pool_push(backend, &kind, sizeof(kind)); pool_read(backend, &len, sizeof(len)); diff --git a/src/test/regression/tests/006.memqcache/expected.1 b/src/test/regression/tests/006.memqcache/expected.1 new file mode 100644 index 000000000..0475eca53 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/expected.1 @@ -0,0 +1,30 @@ +FE=> Parse(stmt="S0", query="SELECT 1") +FE=> Bind(stmt="S0", portal="P0") +FE=> Describe(portal="P0") +FE=> Execute(portal="P0") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE ReadyForQuery(I) +FE=> Parse(stmt="S1", query="SELECT 2") +FE=> Bind(stmt="S1", portal="P1") +FE=> Describe(portal="P1") +FE=> Execute(portal="P1") +FE=> Bind(stmt="S0", portal="P2") +FE=> Describe(portal="P2") +FE=> Execute(portal="P2") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE ReadyForQuery(I) +FE=> Terminate diff --git a/src/test/regression/tests/006.memqcache/expected.2 b/src/test/regression/tests/006.memqcache/expected.2 new file mode 100644 index 000000000..11dbdadfe --- /dev/null +++ b/src/test/regression/tests/006.memqcache/expected.2 @@ -0,0 +1,25 @@ +FE=> Parse(stmt="S0", query="SELECT 1") +FE=> Bind(stmt="S0", portal="P0") +FE=> Describe(portal="P0") +FE=> Execute(portal="P0") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE ReadyForQuery(I) +FE=> Parse(stmt="S1", query="SELECT setseed(10)") +FE=> Bind(stmt="S1", portal="P1") +FE=> Describe(portal="P1") +FE=> Execute(portal="P1") +FE=> Bind(stmt="S0", portal="P2") +FE=> Describe(portal="P2") +FE=> Execute(portal="P2") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE ErrorResponse(S ERROR V ERROR C 22023 M setseed parameter 10 is out of allowed range [-1,1] +<= BE ReadyForQuery(I) +FE=> Terminate diff --git a/src/test/regression/tests/006.memqcache/expected.3 b/src/test/regression/tests/006.memqcache/expected.3 new file mode 100644 index 000000000..05abf7833 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/expected.3 @@ -0,0 +1,32 @@ +FE=> Parse(stmt="S0", query="SELECT * FROM (VALUES(1),(2))") +FE=> Bind(stmt="S0", portal="P0") +FE=> Describe(portal="P0") +FE=> Execute(portal="P0") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE DataRow +<= BE CommandComplete(SELECT 2) +<= BE ReadyForQuery(I) +FE=> Parse(stmt="S1", query="SELECT * FROM (VALUES(1),(2),(3))") +FE=> Bind(stmt="S1", portal="P1") +FE=> Describe(portal="P1") +FE=> Execute(portal="P1") +FE=> Bind(stmt="S0", portal="P2") +FE=> Describe(portal="P2") +FE=> Execute(portal="P2") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE PortalSuspended +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE DataRow +<= BE CommandComplete(SELECT 2) +<= BE ReadyForQuery(I) +FE=> Terminate diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug1.data b/src/test/regression/tests/006.memqcache/query_cache_bug1.data new file mode 100644 index 000000000..dfc842882 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/query_cache_bug1.data @@ -0,0 +1,21 @@ +# "packet kind of backend 0 ['T'] does not match with main/majority nodes packet kind ['Z']" +'P' "S0" "SELECT 1" 0 +'B' "P0" "S0" 0 0 0 +'D' 'P' "P0" +'E' "P0" 0 +'S' +'Y' + +'P' "S1" "SELECT 2" 0 +'B' "P1" "S1" 0 0 0 +'D' 'P' "P1" +'E' "P1" 0 + +'B' "P2" "S0" 0 0 0 +'D' 'P' "P2" +'E' "P2" 0 + +'S' +'Y' + +'X' diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug2.data b/src/test/regression/tests/006.memqcache/query_cache_bug2.data new file mode 100644 index 000000000..e9e14ee65 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/query_cache_bug2.data @@ -0,0 +1,23 @@ +# test case including error response + +'P' "S0" "SELECT 1" 0 +'B' "P0" "S0" 0 0 0 +'D' 'P' "P0" +'E' "P0" 0 +'S' +'Y' + +# setseed parameter out of range error +'P' "S1" "SELECT setseed(10)" 0 +'B' "P1" "S1" 0 0 0 +'D' 'P' "P1" +'E' "P1" 0 + +'B' "P2" "S0" 0 0 0 +'D' 'P' "P2" +'E' "P2" 0 + +'S' +'Y' + +'X' diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug3.data b/src/test/regression/tests/006.memqcache/query_cache_bug3.data new file mode 100644 index 000000000..790f21a3d --- /dev/null +++ b/src/test/regression/tests/006.memqcache/query_cache_bug3.data @@ -0,0 +1,21 @@ +# Portal suspended test + +'P' "S0" "SELECT * FROM (VALUES(1),(2))" 0 +'B' "P0" "S0" 0 0 0 +'D' 'P' "P0" +'E' "P0" 0 +'S' +'Y' + +'P' "S1" "SELECT * FROM (VALUES(1),(2),(3))" 0 +'B' "P1" "S1" 0 0 0 +'D' 'P' "P1" +'E' "P1" 1 + +'B' "P2" "S0" 0 0 0 +'D' 'P' "P2" +'E' "P2" 0 +'S' +'Y' + +'X' diff --git a/src/test/regression/tests/006.memqcache/test.sh b/src/test/regression/tests/006.memqcache/test.sh index fa9f4b15e..fa8dd5567 100755 --- a/src/test/regression/tests/006.memqcache/test.sh +++ b/src/test/regression/tests/006.memqcache/test.sh @@ -519,4 +519,56 @@ EOF cd .. done +# +# Test for extended query protocol coner cases in streaming replication mode. +# These tests are basically for a sequence of extended queries: +# 1. execute a SELECT and create query cache entry. +# 2. sync. +# 3. execute another a SELECT. +# 4. execute bind and execute to use the query cache created at #1. +# 5. sync. + +rm -fr $TESTDIR +mkdir $TESTDIR +cd $TESTDIR + +# create test environment +echo -n "creating test environment..." +$PGPOOL_SETUP -m s -n 2 || exit 1 +echo "done." + +echo "memory_cache_enabled = on" >> etc/pgpool.conf +cd .. + +for i in 1 2 3 +do + # + # case 1: failed with kind mismatch error at #5. + # "packet kind of backend 0 ['T'] does not match with main/majority nodes packet kind ['Z']" + # + # case 2: step #4 includes error (hung). + # + # case 3: step #4 includes PortalSuspended (hung). + cd $TESTDIR + ./startall + wait_for_pgpool_startup + timeout 1 $PGPROTO -d test -f ../query_cache_bug$i.data |& del_details_from_error > result + if [ $? != 0 ];then + # timeout happened or pgproto returned non 0 status + echo "test failed in test case #2 (timeout)" + err=true + ./shutdownall + exit 1 + fi + ./shutdownall + cd .. + diff -c expected.$i $TESTDIR/result > $log + if [ $? != 0 ];then + echo "test failed in test case $i" + cat $log + rm $log + exit 1 + fi +done + exit 0 |