From 2a99aa5d1910f1fd4855c8eb6751a26cbaa5e48d Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Wed, 11 Dec 2024 18:31:02 +0900 Subject: [PATCH] Fix yet another query cache bug in streaming replication mode. If query cache is enabled and query is operated in extended query mode and pgpool is running in streaming replication mode, an execute message could return incorrect results. This could happen when an execute message comes with a non 0 row number parameter. In this case it fetches up to the specified number of rows and returns "PortalSuspended" message. Pgpool-II does not create query cache for this. But if another execute message with 0 row number parameter comes in, it fetches rest of rows (if any) and creates query cache with the number of rows which the execute messages fetched. Obviously this causes unwanted results later on: another execute messages returns result from query cache which has only number of rows captured by the previous execute message with limited number of rows. Another trouble is when multiple execute messages are sent consecutively. In this case Pgpool-II returned exactly the same results from query cache for each execute message. This is wrong since the second or subsequent executes should return 0 rows. To fix this, new boolean fields "atEnd" and "partial_fetch" are introduced in the query context. They are initialized to false when a query context is created (also initialized when bind message is received). If an execute message with 0 row number is executed, atEnd is set to true upon receiving CommandComplete message. If an execute message with non 0 row number is executed, partial_fetch is set to true and never uses the cache result, nor creates query cache. When atEnd is true, pgpool will return CommandComplete message with "SELECT 0" as a result of the execute message. Also tests for this case is added to the 006.memqcache regression test. Backpatch-through: v4.2 Discussion: [pgpool-hackers: 4547] Bug in query cache https://www.pgpool.net/pipermail/pgpool-hackers/2024-December/004548.html --- src/context/pool_query_context.c | 2 + src/include/context/pool_query_context.h | 6 +++ src/include/query_cache/pool_memqcache.h | 5 +- src/protocol/CommandComplete.c | 14 ++++- src/protocol/pool_proto_modules.c | 49 +++++++++++++++--- src/query_cache/pool_memqcache.c | 49 ++++++++++++++++-- .../regression/tests/006.memqcache/expected.4 | 51 +++++++++++++++++++ .../tests/006.memqcache/query_cache_bug4.data | 37 ++++++++++++++ .../regression/tests/006.memqcache/test.sh | 6 ++- 9 files changed, 206 insertions(+), 13 deletions(-) create mode 100644 src/test/regression/tests/006.memqcache/expected.4 create mode 100644 src/test/regression/tests/006.memqcache/query_cache_bug4.data diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c index f484b77a7..4762c758e 100644 --- a/src/context/pool_query_context.c +++ b/src/context/pool_query_context.c @@ -169,6 +169,8 @@ pool_start_query(POOL_QUERY_CONTEXT * query_context, char *query, int len, Node query_context->temp_cache = pool_create_temp_query_cache(query); pool_set_query_in_progress(); query_context->skip_cache_commit = false; + query_context->atEnd = false; + query_context->partial_fetch = false; session_context->query_context = query_context; MemoryContextSwitchTo(old_context); } diff --git a/src/include/context/pool_query_context.h b/src/include/context/pool_query_context.h index d35c04986..8ca7fb336 100644 --- a/src/include/context/pool_query_context.h +++ b/src/include/context/pool_query_context.h @@ -90,6 +90,12 @@ typedef struct * extended query, do not commit cache if * this flag is true. */ + bool atEnd; /* if true all rows have been already + * fetched from the portal */ + + bool partial_fetch; /* if true some rows have been fetched by + * an execute with non 0 row option */ + MemoryContext memory_context; /* memory context for query context */ } POOL_QUERY_CONTEXT; diff --git a/src/include/query_cache/pool_memqcache.h b/src/include/query_cache/pool_memqcache.h index 9607fb835..aee7dd025 100644 --- a/src/include/query_cache/pool_memqcache.h +++ b/src/include/query_cache/pool_memqcache.h @@ -250,7 +250,7 @@ extern uint32 hash_any(unsigned char *k, int keylen); extern POOL_STATUS pool_fetch_from_memory_cache(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, - char *contents, bool *foundp); + char *contents, bool use_fake_cache, bool *foundp); extern int pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len); extern int pool_catalog_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen); @@ -277,7 +277,8 @@ extern POOL_QUERY_CACHE_ARRAY * pool_create_query_cache_array(void); extern void pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array); extern POOL_TEMP_QUERY_CACHE * pool_create_temp_query_cache(char *query); -extern void pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state); +extern void pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state, + bool partial_fetch); extern int pool_init_memqcache_stats(void); extern POOL_QUERY_CACHE_STATS * pool_get_memqcache_stats(void); diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c index 8dbe3ebe5..0a3b286d8 100644 --- a/src/protocol/CommandComplete.c +++ b/src/protocol/CommandComplete.c @@ -229,7 +229,19 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool query = session_context->query_context->query_w_hex; node = pool_get_parse_tree(); state = TSTATE(backend, MAIN_NODE_ID); - pool_handle_query_cache(backend, query, node, state); + + /* + * If some rows have been fetched by an execute with non 0 row option, + * we do not create cache. + */ + pool_handle_query_cache(backend, query, node, state, + session_context->query_context->partial_fetch); + + /* + * CommandComplete guarantees that all rows have been fetched. We + * can unconditionally set atEnd flag. + */ + session_context->query_context->atEnd = true; } } diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c index 6465fdb5a..f93304f12 100644 --- a/src/protocol/pool_proto_modules.c +++ b/src/protocol/pool_proto_modules.c @@ -275,7 +275,7 @@ SimpleQuery(POOL_CONNECTION * frontend, * If the query is SELECT from table to cache, try to fetch cached * result. */ - status = pool_fetch_from_memory_cache(frontend, backend, contents, &foundp); + status = pool_fetch_from_memory_cache(frontend, backend, contents, false, &foundp); if (status != POOL_CONTINUE) return status; @@ -921,6 +921,8 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, POOL_QUERY_CONTEXT *query_context; POOL_SENT_MESSAGE *bind_msg; bool foundp = false; + int num_rows; + char *p; /* Get session context */ session_context = pool_get_session_context(false); @@ -932,6 +934,10 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, ereport(DEBUG2, (errmsg("Execute: portal name <%s>", contents))); + /* obtain number of returning rows */ + p = contents + strlen(contents) + 1; + memcpy(&num_rows, p, sizeof(num_rows)); + bind_msg = pool_get_sent_message('B', contents, POOL_SENT_MESSAGE_CREATED); if (!bind_msg) ereport(FATAL, @@ -961,6 +967,18 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, node = bind_msg->query_context->parse_tree; query = bind_msg->query_context->original_query; + /* + * If execute message's parameter is not 0, set partial_fetch flag to true + * so that subsequent execute message knows that the portal started with + * partial fetching. + */ + if (num_rows != 0) + { + query_context->partial_fetch = true; + elog(DEBUG1, "set partial_fetch in execute"); + } + elog(DEBUG1, "execute: partial_fetch: %d", query_context->partial_fetch); + strlcpy(query_string_buffer, query, sizeof(query_string_buffer)); ereport(DEBUG2, (errmsg("Execute: query string = <%s>", query))); @@ -974,11 +992,15 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, ereport(LOG, (errmsg("statement: %s", query))); /* - * Fetch memory cache if possible + * Fetch memory cache if possible. Also if atEnd is false or the execute + * message has 0 row argument, we maybe able to use cache. + * If partial_fetch is true, cannot use cache. */ if (pool_config->memory_cache_enabled && !pool_is_writing_transaction() && (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E') - && pool_is_likely_select(query) && !query_cache_disabled()) + && pool_is_likely_select(query) && !query_cache_disabled() && + (query_context->atEnd || num_rows == 0) && + !query_context->partial_fetch) { POOL_STATUS status; char *search_query = NULL; @@ -1060,7 +1082,15 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, * If the query is SELECT from table to cache, try to fetch cached * result. */ - status = pool_fetch_from_memory_cache(frontend, backend, search_query, &foundp); + + /* + * If we are in streaming replication mode and use extended query + * protocol, pass the information atEnd which represents whether all + * rows in the portal has been already retrieved. If so, + * pool_fetch_from_memory_cache will return "CommandComplete 0" cache. + */ + status = pool_fetch_from_memory_cache(frontend, backend, search_query, + query_context->atEnd, &foundp); if (status != POOL_CONTINUE) return status; @@ -1683,6 +1713,13 @@ Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, errdetail("cannot get the query context"))); } + /* + * Since now that fresh portal is created, we reset atEnd and + * partial_fetch flag. + */ + query_context->atEnd = false; + query_context->partial_fetch = false; + /* * If the query can be cached, save its offset of query text in bind * message's content. @@ -2335,7 +2372,7 @@ ReadyForQuery(POOL_CONNECTION * frontend, if (session_context->query_context && session_context->query_context->query_state[MAIN_NODE_ID] == POOL_EXECUTE_COMPLETE) { - pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state); + pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state, false); if (session_context->query_context->query_w_hex) pfree(session_context->query_context->query_w_hex); session_context->query_context->query_w_hex = NULL; @@ -2348,7 +2385,7 @@ ReadyForQuery(POOL_CONNECTION * frontend, state = 'I'; /* XXX I don't think query cache works * with PROTO2 protocol */ } - pool_handle_query_cache(backend, query, node, state); + pool_handle_query_cache(backend, query, node, state, false); } } } diff --git a/src/query_cache/pool_memqcache.c b/src/query_cache/pool_memqcache.c index a12557ad1..37db5cb9d 100644 --- a/src/query_cache/pool_memqcache.c +++ b/src/query_cache/pool_memqcache.c @@ -125,6 +125,7 @@ static volatile POOL_HASH_ELEMENT *get_new_hash_element(void); static void put_back_hash_element(volatile POOL_HASH_ELEMENT * element); static bool is_free_hash_element(void); static void inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen); +static char *create_fake_cache(size_t *len); /* * if true, shared memory is locked in this process now. @@ -726,11 +727,13 @@ delete_cache_on_memcached(const char *key) /* * Fetch SELECT data from cache if possible. + * + * If use_fake_cache is true, make up "CommandComplete 0" result and use it. */ POOL_STATUS pool_fetch_from_memory_cache(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, - char *contents, bool *foundp) + char *contents, bool use_fake_cache, bool *foundp) { char *qcache; size_t qcachelen; @@ -779,6 +782,16 @@ pool_fetch_from_memory_cache(POOL_CONNECTION * frontend, session_context = pool_get_session_context(true); target_backend = CONNECTION(backend, session_context->load_balance_node_id); + + /* + * If use_fake_cache is true, make up "CommandComplete (0)" response. + */ + if (use_fake_cache) + { + pfree(qcache); + qcache = create_fake_cache(&qcachelen); + elog(DEBUG2, "fake_cach: len: %ld", qcachelen); + } inject_cached_message(target_backend, qcache, qcachelen); } else @@ -827,6 +840,29 @@ pool_fetch_from_memory_cache(POOL_CONNECTION * frontend, return POOL_CONTINUE; } +/* + * Make up response packet for "CommandComplete (SELECT 0)" message. + */ +static char * +create_fake_cache(size_t *len) +{ + char *qcache, *p; + int32 mlen; /* message length including self */ + static char* msg = "SELECT 0"; + + *len = sizeof(char) + /* message kind */ + sizeof(int32) + /* packet length including self */ + sizeof(msg); /* Command Complete message with 0 row returned */ + mlen = *len - 1; /* message length does not include message kind */ + mlen = htonl(mlen); + p = qcache = palloc(*len); + *p++ = 'C'; + memcpy(p, &mlen, sizeof(mlen)); + p += sizeof(mlen); + strncpy(p, msg, strlen(msg) + 1); + return qcache; +} + /* * Simple and rough (thus unreliable) check if the query is likely * SELECT. Just check if the query starts with SELECT or WITH. This @@ -3616,7 +3652,8 @@ pool_check_and_discard_cache_buffer(int num_oids, int *oids) * For other case At Ready for Query handle query cache. */ void -pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state) +pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state, + bool partial_fetch) { POOL_SESSION_CONTEXT *session_context; pool_sigset_t oldmask; @@ -3629,7 +3666,7 @@ pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, session_context = pool_get_session_context(true); /* Ok to cache SELECT result? */ - if (pool_is_cache_safe() && !query_cache_disabled()) + if (!partial_fetch && pool_is_cache_safe() && !query_cache_disabled()) { SelectContext ctx; MemoryContext old_context; @@ -3746,6 +3783,12 @@ pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, /* Discard buffered data */ pool_reset_memqcache_buffer(true); } + else if (partial_fetch) /* cannot create cache because of partial fetch */ + { + /* Discard buffered data */ + pool_reset_memqcache_buffer(true); + } + else if (is_commit_query(node)) /* Commit? */ { int num_caches; diff --git a/src/test/regression/tests/006.memqcache/expected.4 b/src/test/regression/tests/006.memqcache/expected.4 new file mode 100644 index 000000000..399afcc31 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/expected.4 @@ -0,0 +1,51 @@ +FE=> Parse(stmt="", query="SELECT * FROM (VALUES(1),(2)) v") +FE=> Bind(stmt="", portal="") +FE=> Describe(portal="") +FE=> Execute(portal="") +FE=> Execute(portal="") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE PortalSuspended +<= BE DataRow +<= BE CommandComplete(SELECT 1) +<= BE ReadyForQuery(I) +FE=> Parse(stmt="", query="SELECT * FROM (VALUES(1),(2)) v") +FE=> Bind(stmt="", portal="") +FE=> Describe(portal="") +FE=> Execute(portal="") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE DataRow +<= BE CommandComplete(SELECT 2) +<= BE ReadyForQuery(I) +FE=> Parse(stmt="S1", query="SELECT * FROM (VALUES(1),(2)) v") +FE=> Bind(stmt="S1", portal="P1") +FE=> Describe(portal="P1") +FE=> Execute(portal="P1") +FE=> Sync +<= BE ParseComplete +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE DataRow +<= BE CommandComplete(SELECT 2) +<= BE ReadyForQuery(I) +FE=> Bind(stmt="S1", portal="P1") +FE=> Describe(portal="P1") +FE=> Execute(portal="P1") +FE=> Execute(portal="P1") +FE=> Sync +<= BE BindComplete +<= BE RowDescription +<= BE DataRow +<= BE DataRow +<= BE CommandComplete(SELECT 2) +<= BE CommandComplete(SELECT 0) +<= BE ReadyForQuery(I) +FE=> Terminate diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug4.data b/src/test/regression/tests/006.memqcache/query_cache_bug4.data new file mode 100644 index 000000000..ba4d71dc8 --- /dev/null +++ b/src/test/regression/tests/006.memqcache/query_cache_bug4.data @@ -0,0 +1,37 @@ +# Since the first Eexecute returns portal suspended, the second +# E does not create cache. +'P' "" "SELECT * FROM (VALUES(1),(2)) v" 0 +'B' "" "" 0 0 0 +'D' 'P' "" +'E' "" 1 +'E' "" 0 +'S' +'Y' + +# Since no cache was created, two rows are returned, +# and cache created. +'P' "" "SELECT * FROM (VALUES(1),(2)) v" 0 +'B' "" "" 0 0 0 +'D' 'P' "" +'E' "" 0 +'S' +'Y' + +# This time cache is used. 2 rows are returned. +'P' "S1" "SELECT * FROM (VALUES(1),(2)) v" 0 +'B' "P1" "S1" 0 0 0 +'D' 'P' "P1" +'E' "P1" 0 +'S' +'Y' + +# cache is used, 2 rows are returned. +# but the second E returns 0 rows. +'B' "P1" "S1" 0 0 0 +'D' 'P' "P1" +'E' "P1" 0 +'E' "P1" 0 +'S' +'Y' + +'X' diff --git a/src/test/regression/tests/006.memqcache/test.sh b/src/test/regression/tests/006.memqcache/test.sh index db9112151..bc0003293 100755 --- a/src/test/regression/tests/006.memqcache/test.sh +++ b/src/test/regression/tests/006.memqcache/test.sh @@ -488,7 +488,7 @@ echo "done." echo "memory_cache_enabled = on" >> etc/pgpool.conf cd .. -for i in 1 2 3 +for i in 1 2 3 4 4 do # # case 1: failed with kind mismatch error at #5. @@ -497,6 +497,10 @@ do # case 2: step #4 includes error (hung). # # case 3: step #4 includes PortalSuspended (hung). + # + # case 4: various cases including portal suspended + # Note that case4 is executed twice to make sure that + # the test works for either query cache exists or does not exist cd $TESTDIR ./startall wait_for_pgpool_startup -- 2.39.5