Fix yet another query cache bug in streaming replication mode.
authorTatsuo Ishii <ishii@postgresql.org>
Wed, 11 Dec 2024 09:31:02 +0000 (18:31 +0900)
committerTatsuo Ishii <ishii@postgresql.org>
Sat, 14 Dec 2024 11:41:04 +0000 (20:41 +0900)
If query cache is enabled and query is operated in extended query mode
and pgpool is running in streaming replication mode, an execute
message could return incorrect results.

This could happen when an execute message comes with a non 0 row
number parameter. In this case it fetches up to the specified number of
rows and returns "PortalSuspended" message. Pgpool-II does not create
query cache for this.  But if another execute message with 0 row
number parameter comes in, it fetches rest of rows (if any) and
creates query cache with the number of rows which the execute messages
fetched.

Obviously this causes unwanted results later on: another execute
messages returns result from query cache which has only number of rows
captured by the previous execute message with limited number of rows.

Another trouble is when multiple execute messages are sent
consecutively.  In this case Pgpool-II returned exactly the same
results from query cache for each execute message. This is wrong since
the second or subsequent executes should return 0 rows.

To fix this, new boolean fields "atEnd" and "partial_fetch" are
introduced in the query context. They are initialized to false when a
query context is created (also initialized when bind message is
received). If an execute message with 0 row number is executed, atEnd
is set to true upon receiving CommandComplete message. If an execute
message with non 0 row number is executed, partial_fetch is set to
true and never uses the cache result, nor creates query cache.

When atEnd is true, pgpool will return CommandComplete message with
"SELECT 0" as a result of the execute message.

Also tests for this case is added to the 006.memqcache regression
test.

Backpatch-through: v4.2
Discussion: [pgpool-hackers: 4547] Bug in query cache
https://www.pgpool.net/pipermail/pgpool-hackers/2024-December/004548.html

src/context/pool_query_context.c
src/include/context/pool_query_context.h
src/include/query_cache/pool_memqcache.h
src/protocol/CommandComplete.c
src/protocol/pool_proto_modules.c
src/query_cache/pool_memqcache.c
src/test/regression/tests/006.memqcache/expected.4 [new file with mode: 0644]
src/test/regression/tests/006.memqcache/query_cache_bug4.data [new file with mode: 0644]
src/test/regression/tests/006.memqcache/test.sh

index f484b77a756d193567ba52c7ca744ea7f2844c26..4762c758ef8af7ab9bad1bcd9cd5a565021c4c0c 100644 (file)
@@ -169,6 +169,8 @@ pool_start_query(POOL_QUERY_CONTEXT * query_context, char *query, int len, Node
                        query_context->temp_cache = pool_create_temp_query_cache(query);
                pool_set_query_in_progress();
                query_context->skip_cache_commit = false;
+               query_context->atEnd = false;
+               query_context->partial_fetch = false;
                session_context->query_context = query_context;
                MemoryContextSwitchTo(old_context);
        }
index d35c04986ece2b76279f63cfa42f09aff5c2838f..8ca7fb33637f289232de68be492c2002029b66ee 100644 (file)
@@ -90,6 +90,12 @@ typedef struct
                                                                         * extended query, do not commit cache if
                                                                         * this flag is true. */
 
+       bool            atEnd;                          /* if true all rows have been already
+                                                                        * fetched from the portal */
+
+       bool            partial_fetch;          /* if true some rows have been fetched by
+                                                                        * an execute with non 0 row option */
+
        MemoryContext memory_context;   /* memory context for query context */
 }                      POOL_QUERY_CONTEXT;
 
index 9607fb835d631cf0adf8f0d1c1763235a36202b0..aee7dd025afe0a272f58411f8f291ed5e0bcafd5 100644 (file)
@@ -250,7 +250,7 @@ extern uint32 hash_any(unsigned char *k, int keylen);
 
 extern POOL_STATUS pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
                                                                                                POOL_CONNECTION_POOL * backend,
-                                                                                               char *contents, bool *foundp);
+                                                                                               char *contents, bool use_fake_cache, bool *foundp);
 
 extern int pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len);
 extern int pool_catalog_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen);
@@ -277,7 +277,8 @@ extern POOL_QUERY_CACHE_ARRAY * pool_create_query_cache_array(void);
 extern void pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array);
 
 extern POOL_TEMP_QUERY_CACHE * pool_create_temp_query_cache(char *query);
-extern void pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state);
+extern void pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state,
+       bool partial_fetch);
 
 extern int     pool_init_memqcache_stats(void);
 extern POOL_QUERY_CACHE_STATS * pool_get_memqcache_stats(void);
index 8dbe3ebe5018029793b9b62b802eb06754953d48..0a3b286d86e0ccb52e85d9323221fc5feedbae15 100644 (file)
@@ -229,7 +229,19 @@ CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool
                        query = session_context->query_context->query_w_hex;
                        node = pool_get_parse_tree();
                        state = TSTATE(backend, MAIN_NODE_ID);
-                       pool_handle_query_cache(backend, query, node, state);
+
+                       /*
+                        * If some rows have been fetched by an execute with non 0 row option,
+                        * we do not create cache.
+                        */ 
+                       pool_handle_query_cache(backend, query, node, state,
+                                                                       session_context->query_context->partial_fetch);
+
+                       /*
+                        * CommandComplete guarantees that all rows have been fetched.  We
+                        * can unconditionally set atEnd flag.
+                        */
+                       session_context->query_context->atEnd = true;
                }
        }
 
index 6465fdb5ab2f8d5d2fc7219c1a6eb8a9a476dbfb..f93304f1221da911aa28ee76df703ab7ea9f613f 100644 (file)
@@ -275,7 +275,7 @@ SimpleQuery(POOL_CONNECTION * frontend,
                 * If the query is SELECT from table to cache, try to fetch cached
                 * result.
                 */
-               status = pool_fetch_from_memory_cache(frontend, backend, contents, &foundp);
+               status = pool_fetch_from_memory_cache(frontend, backend, contents, false, &foundp);
 
                if (status != POOL_CONTINUE)
                        return status;
@@ -921,6 +921,8 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
        POOL_QUERY_CONTEXT *query_context;
        POOL_SENT_MESSAGE *bind_msg;
        bool            foundp = false;
+       int                     num_rows;
+       char            *p;
 
        /* Get session context */
        session_context = pool_get_session_context(false);
@@ -932,6 +934,10 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
        ereport(DEBUG2,
                        (errmsg("Execute: portal name <%s>", contents)));
 
+       /* obtain number of returning rows */
+       p = contents + strlen(contents) + 1;
+       memcpy(&num_rows, p, sizeof(num_rows));
+
        bind_msg = pool_get_sent_message('B', contents, POOL_SENT_MESSAGE_CREATED);
        if (!bind_msg)
                ereport(FATAL,
@@ -961,6 +967,18 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
        node = bind_msg->query_context->parse_tree;
        query = bind_msg->query_context->original_query;
 
+       /*
+        * If execute message's parameter is not 0, set partial_fetch flag to true
+        * so that subsequent execute message knows that the portal started with
+        * partial fetching.
+        */
+       if (num_rows != 0)
+       {
+               query_context->partial_fetch = true;
+               elog(DEBUG1, "set partial_fetch in execute");
+       }
+       elog(DEBUG1, "execute: partial_fetch: %d", query_context->partial_fetch);
+       
        strlcpy(query_string_buffer, query, sizeof(query_string_buffer));
 
        ereport(DEBUG2, (errmsg("Execute: query string = <%s>", query)));
@@ -974,11 +992,15 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
                ereport(LOG, (errmsg("statement: %s", query)));
 
        /*
-        * Fetch memory cache if possible
+        * Fetch memory cache if possible.  Also if atEnd is false or the execute
+        * message has 0 row argument, we maybe able to use cache.
+        * If partial_fetch is true, cannot use cache.
         */
        if (pool_config->memory_cache_enabled && !pool_is_writing_transaction() &&
                (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E')
-               && pool_is_likely_select(query) && !query_cache_disabled())
+               && pool_is_likely_select(query) && !query_cache_disabled() &&
+               (query_context->atEnd || num_rows == 0) &&
+               !query_context->partial_fetch)
        {
                POOL_STATUS status;
                char       *search_query = NULL;
@@ -1060,7 +1082,15 @@ Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
                 * If the query is SELECT from table to cache, try to fetch cached
                 * result.
                 */
-               status = pool_fetch_from_memory_cache(frontend, backend, search_query, &foundp);
+
+               /*
+                * If we are in streaming replication mode and use extended query
+                * protocol, pass the information atEnd which represents whether all
+                * rows in the portal has been already retrieved. If so,
+                * pool_fetch_from_memory_cache will return "CommandComplete 0" cache.
+                */
+               status = pool_fetch_from_memory_cache(frontend, backend, search_query,
+                                                                                         query_context->atEnd, &foundp);
 
                if (status != POOL_CONTINUE)
                        return status;
@@ -1683,6 +1713,13 @@ Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
                                 errdetail("cannot get the query context")));
        }
 
+       /*
+        * Since now that fresh portal is created, we reset atEnd and
+        * partial_fetch flag.
+        */
+       query_context->atEnd = false;
+       query_context->partial_fetch = false;
+
        /*
         * If the query can be cached, save its offset of query text in bind
         * message's content.
@@ -2335,7 +2372,7 @@ ReadyForQuery(POOL_CONNECTION * frontend,
                                        if (session_context->query_context &&
                                                session_context->query_context->query_state[MAIN_NODE_ID] == POOL_EXECUTE_COMPLETE)
                                        {
-                                               pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state);
+                                               pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state, false);
                                                if (session_context->query_context->query_w_hex)
                                                        pfree(session_context->query_context->query_w_hex);
                                                session_context->query_context->query_w_hex = NULL;
@@ -2348,7 +2385,7 @@ ReadyForQuery(POOL_CONNECTION * frontend,
                                                state = 'I';    /* XXX I don't think query cache works
                                                                                 * with PROTO2 protocol */
                                        }
-                                       pool_handle_query_cache(backend, query, node, state);
+                                       pool_handle_query_cache(backend, query, node, state, false);
                                }
                        }
                }
index a12557ad1e111cbf025890ce184d92d907179523..37db5cb9d519e50ea144ad85d1898d11aa93cbf9 100644 (file)
@@ -125,6 +125,7 @@ static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
 static void put_back_hash_element(volatile POOL_HASH_ELEMENT * element);
 static bool is_free_hash_element(void);
 static void inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen);
+static char *create_fake_cache(size_t *len);
 
 /*
  * if true, shared memory is locked in this process now.
@@ -726,11 +727,13 @@ delete_cache_on_memcached(const char *key)
 
 /*
  * Fetch SELECT data from cache if possible.
+ *
+ * If use_fake_cache is true, make up "CommandComplete 0" result and use it.
  */
 POOL_STATUS
 pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
                                                         POOL_CONNECTION_POOL * backend,
-                                                        char *contents, bool *foundp)
+                                                        char *contents, bool use_fake_cache, bool *foundp)
 {
        char       *qcache;
        size_t          qcachelen;
@@ -779,6 +782,16 @@ pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
 
                session_context = pool_get_session_context(true);
                target_backend = CONNECTION(backend, session_context->load_balance_node_id);
+
+               /*
+                * If use_fake_cache is true, make up "CommandComplete (0)" response.
+                */
+               if (use_fake_cache)
+               {
+                       pfree(qcache);
+                       qcache = create_fake_cache(&qcachelen);
+                       elog(DEBUG2, "fake_cach: len: %ld", qcachelen);
+               }
                inject_cached_message(target_backend, qcache, qcachelen);
        }
        else
@@ -827,6 +840,29 @@ pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
        return POOL_CONTINUE;
 }
 
+/*
+ * Make up response packet for "CommandComplete (SELECT 0)" message.
+ */
+static char *
+create_fake_cache(size_t *len)
+{
+       char            *qcache, *p;
+       int32           mlen;   /* message length including self */
+       static          char*   msg = "SELECT 0";
+
+       *len = sizeof(char)     +       /* message kind */
+               sizeof(int32) +         /* packet length including self */
+               sizeof(msg);    /* Command Complete message with 0 row returned */
+       mlen = *len - 1;        /* message length does not include message kind */
+       mlen = htonl(mlen);
+       p = qcache = palloc(*len);
+       *p++ = 'C';
+       memcpy(p, &mlen, sizeof(mlen));
+       p += sizeof(mlen);
+       strncpy(p, msg, strlen(msg) + 1);
+       return qcache;
+}
+
 /*
  * Simple and rough (thus unreliable) check if the query is likely
  * SELECT. Just check if the query starts with SELECT or WITH. This
@@ -3616,7 +3652,8 @@ pool_check_and_discard_cache_buffer(int num_oids, int *oids)
  * For other case At Ready for Query handle query cache.
  */
 void
-pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state)
+pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state,
+                                               bool partial_fetch)
 {
        POOL_SESSION_CONTEXT *session_context;
        pool_sigset_t oldmask;
@@ -3629,7 +3666,7 @@ pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node,
        session_context = pool_get_session_context(true);
 
        /* Ok to cache SELECT result? */
-       if (pool_is_cache_safe() && !query_cache_disabled())
+       if (!partial_fetch && pool_is_cache_safe() && !query_cache_disabled())
        {
                SelectContext ctx;
                MemoryContext old_context;
@@ -3746,6 +3783,12 @@ pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node,
                /* Discard buffered data */
                pool_reset_memqcache_buffer(true);
        }
+       else if (partial_fetch)         /* cannot create cache because of partial fetch */
+       {
+               /* Discard buffered data */
+               pool_reset_memqcache_buffer(true);
+       }
+
        else if (is_commit_query(node)) /* Commit? */
        {
                int                     num_caches;
diff --git a/src/test/regression/tests/006.memqcache/expected.4 b/src/test/regression/tests/006.memqcache/expected.4
new file mode 100644 (file)
index 0000000..399afcc
--- /dev/null
@@ -0,0 +1,51 @@
+FE=> Parse(stmt="", query="SELECT * FROM (VALUES(1),(2)) v")
+FE=> Bind(stmt="", portal="")
+FE=> Describe(portal="")
+FE=> Execute(portal="")
+FE=> Execute(portal="")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE PortalSuspended
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="", query="SELECT * FROM (VALUES(1),(2)) v")
+FE=> Bind(stmt="", portal="")
+FE=> Describe(portal="")
+FE=> Execute(portal="")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="S1", query="SELECT * FROM (VALUES(1),(2)) v")
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE ReadyForQuery(I)
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Sync
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE CommandComplete(SELECT 0)
+<= BE ReadyForQuery(I)
+FE=> Terminate
diff --git a/src/test/regression/tests/006.memqcache/query_cache_bug4.data b/src/test/regression/tests/006.memqcache/query_cache_bug4.data
new file mode 100644 (file)
index 0000000..ba4d71d
--- /dev/null
@@ -0,0 +1,37 @@
+# Since the first Eexecute returns portal suspended, the second
+# E does not create cache.
+'P'    ""      "SELECT * FROM (VALUES(1),(2)) v"       0
+'B'    ""      ""      0       0       0   
+'D'    'P'     ""
+'E'    ""      1
+'E'    ""      0
+'S'
+'Y'
+
+# Since no cache was created, two rows are returned,
+# and cache created.
+'P'    ""      "SELECT * FROM (VALUES(1),(2)) v"       0
+'B'    ""      ""      0       0       0   
+'D'    'P'     ""
+'E'    ""      0
+'S'
+'Y'
+
+# This time cache is used. 2 rows are returned.
+'P'    "S1"    "SELECT * FROM (VALUES(1),(2)) v"       0
+'B'    "P1"    "S1"    0       0       0   
+'D'    'P'     "P1"
+'E'    "P1"    0
+'S'
+'Y'
+
+# cache is used, 2 rows are returned.
+# but the second E returns 0 rows.
+'B'    "P1"    "S1"    0       0       0   
+'D'    'P'     "P1"
+'E'    "P1"    0
+'E'    "P1"    0
+'S'
+'Y'
+
+'X'
index db91121518f49944eacc37241f75b19d8b399cfd..bc0003293e211951a84d914a2845c3e800ce059d 100755 (executable)
@@ -488,7 +488,7 @@ echo "done."
 echo "memory_cache_enabled = on" >> etc/pgpool.conf
 cd ..
 
-for i in 1 2 3
+for i in 1 2 3 4 4
 do
     #
     # case 1: failed with kind mismatch error at #5.
@@ -497,6 +497,10 @@ do
     # case 2: step #4 includes error (hung).
     #
     # case 3: step #4 includes PortalSuspended (hung).
+    #
+    # case 4: various cases including portal suspended
+    # Note that case4 is executed twice to make sure that
+    # the test works for either query cache exists or does not exist
     cd $TESTDIR
     ./startall
     wait_for_pgpool_startup