query_context->temp_cache = pool_create_temp_query_cache(query);
pool_set_query_in_progress();
query_context->skip_cache_commit = false;
+ query_context->atEnd = false;
+ query_context->partial_fetch = false;
session_context->query_context = query_context;
MemoryContextSwitchTo(old_context);
}
* extended query, do not commit cache if
* this flag is true. */
+ bool atEnd; /* if true all rows have been already
+ * fetched from the portal */
+
+ bool partial_fetch; /* if true some rows have been fetched by
+ * an execute with non 0 row option */
+
MemoryContext memory_context; /* memory context for query context */
} POOL_QUERY_CONTEXT;
extern POOL_STATUS pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
POOL_CONNECTION_POOL * backend,
- char *contents, bool *foundp);
+ char *contents, bool use_fake_cache, bool *foundp);
extern int pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len);
extern int pool_catalog_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen);
extern void pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array);
extern POOL_TEMP_QUERY_CACHE * pool_create_temp_query_cache(char *query);
-extern void pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state);
+extern void pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state,
+ bool partial_fetch);
extern int pool_init_memqcache_stats(void);
extern POOL_QUERY_CACHE_STATS * pool_get_memqcache_stats(void);
query = session_context->query_context->query_w_hex;
node = pool_get_parse_tree();
state = TSTATE(backend, MAIN_NODE_ID);
- pool_handle_query_cache(backend, query, node, state);
+
+ /*
+ * If some rows have been fetched by an execute with non 0 row option,
+ * we do not create cache.
+ */
+ pool_handle_query_cache(backend, query, node, state,
+ session_context->query_context->partial_fetch);
+
+ /*
+ * CommandComplete guarantees that all rows have been fetched. We
+ * can unconditionally set atEnd flag.
+ */
+ session_context->query_context->atEnd = true;
}
}
* If the query is SELECT from table to cache, try to fetch cached
* result.
*/
- status = pool_fetch_from_memory_cache(frontend, backend, contents, &foundp);
+ status = pool_fetch_from_memory_cache(frontend, backend, contents, false, &foundp);
if (status != POOL_CONTINUE)
return status;
POOL_QUERY_CONTEXT *query_context;
POOL_SENT_MESSAGE *bind_msg;
bool foundp = false;
+ int num_rows;
+ char *p;
/* Get session context */
session_context = pool_get_session_context(false);
ereport(DEBUG2,
(errmsg("Execute: portal name <%s>", contents)));
+ /* obtain number of returning rows */
+ p = contents + strlen(contents) + 1;
+ memcpy(&num_rows, p, sizeof(num_rows));
+
bind_msg = pool_get_sent_message('B', contents, POOL_SENT_MESSAGE_CREATED);
if (!bind_msg)
ereport(FATAL,
node = bind_msg->query_context->parse_tree;
query = bind_msg->query_context->original_query;
+ /*
+ * If execute message's parameter is not 0, set partial_fetch flag to true
+ * so that subsequent execute message knows that the portal started with
+ * partial fetching.
+ */
+ if (num_rows != 0)
+ {
+ query_context->partial_fetch = true;
+ elog(DEBUG1, "set partial_fetch in execute");
+ }
+ elog(DEBUG1, "execute: partial_fetch: %d", query_context->partial_fetch);
+
strlcpy(query_string_buffer, query, sizeof(query_string_buffer));
ereport(DEBUG2, (errmsg("Execute: query string = <%s>", query)));
ereport(LOG, (errmsg("statement: %s", query)));
/*
- * Fetch memory cache if possible
+ * Fetch memory cache if possible. Also if atEnd is false or the execute
+ * message has 0 row argument, we maybe able to use cache.
+ * If partial_fetch is true, cannot use cache.
*/
if (pool_config->memory_cache_enabled && !pool_is_writing_transaction() &&
(TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E')
- && pool_is_likely_select(query) && !query_cache_disabled())
+ && pool_is_likely_select(query) && !query_cache_disabled() &&
+ (query_context->atEnd || num_rows == 0) &&
+ !query_context->partial_fetch)
{
POOL_STATUS status;
char *search_query = NULL;
* If the query is SELECT from table to cache, try to fetch cached
* result.
*/
- status = pool_fetch_from_memory_cache(frontend, backend, search_query, &foundp);
+
+ /*
+ * If we are in streaming replication mode and use extended query
+ * protocol, pass the information atEnd which represents whether all
+ * rows in the portal has been already retrieved. If so,
+ * pool_fetch_from_memory_cache will return "CommandComplete 0" cache.
+ */
+ status = pool_fetch_from_memory_cache(frontend, backend, search_query,
+ query_context->atEnd, &foundp);
if (status != POOL_CONTINUE)
return status;
errdetail("cannot get the query context")));
}
+ /*
+ * Since now that fresh portal is created, we reset atEnd and
+ * partial_fetch flag.
+ */
+ query_context->atEnd = false;
+ query_context->partial_fetch = false;
+
/*
* If the query can be cached, save its offset of query text in bind
* message's content.
if (session_context->query_context &&
session_context->query_context->query_state[MAIN_NODE_ID] == POOL_EXECUTE_COMPLETE)
{
- pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state);
+ pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state, false);
if (session_context->query_context->query_w_hex)
pfree(session_context->query_context->query_w_hex);
session_context->query_context->query_w_hex = NULL;
state = 'I'; /* XXX I don't think query cache works
* with PROTO2 protocol */
}
- pool_handle_query_cache(backend, query, node, state);
+ pool_handle_query_cache(backend, query, node, state, false);
}
}
}
static void put_back_hash_element(volatile POOL_HASH_ELEMENT * element);
static bool is_free_hash_element(void);
static void inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen);
+static char *create_fake_cache(size_t *len);
/*
* if true, shared memory is locked in this process now.
/*
* Fetch SELECT data from cache if possible.
+ *
+ * If use_fake_cache is true, make up "CommandComplete 0" result and use it.
*/
POOL_STATUS
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
POOL_CONNECTION_POOL * backend,
- char *contents, bool *foundp)
+ char *contents, bool use_fake_cache, bool *foundp)
{
char *qcache;
size_t qcachelen;
session_context = pool_get_session_context(true);
target_backend = CONNECTION(backend, session_context->load_balance_node_id);
+
+ /*
+ * If use_fake_cache is true, make up "CommandComplete (0)" response.
+ */
+ if (use_fake_cache)
+ {
+ pfree(qcache);
+ qcache = create_fake_cache(&qcachelen);
+ elog(DEBUG2, "fake_cach: len: %ld", qcachelen);
+ }
inject_cached_message(target_backend, qcache, qcachelen);
}
else
return POOL_CONTINUE;
}
+/*
+ * Make up response packet for "CommandComplete (SELECT 0)" message.
+ */
+static char *
+create_fake_cache(size_t *len)
+{
+ char *qcache, *p;
+ int32 mlen; /* message length including self */
+ static char* msg = "SELECT 0";
+
+ *len = sizeof(char) + /* message kind */
+ sizeof(int32) + /* packet length including self */
+ sizeof(msg); /* Command Complete message with 0 row returned */
+ mlen = *len - 1; /* message length does not include message kind */
+ mlen = htonl(mlen);
+ p = qcache = palloc(*len);
+ *p++ = 'C';
+ memcpy(p, &mlen, sizeof(mlen));
+ p += sizeof(mlen);
+ strncpy(p, msg, strlen(msg) + 1);
+ return qcache;
+}
+
/*
* Simple and rough (thus unreliable) check if the query is likely
* SELECT. Just check if the query starts with SELECT or WITH. This
* For other case At Ready for Query handle query cache.
*/
void
-pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state)
+pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state,
+ bool partial_fetch)
{
POOL_SESSION_CONTEXT *session_context;
pool_sigset_t oldmask;
session_context = pool_get_session_context(true);
/* Ok to cache SELECT result? */
- if (pool_is_cache_safe() && !query_cache_disabled())
+ if (!partial_fetch && pool_is_cache_safe() && !query_cache_disabled())
{
SelectContext ctx;
MemoryContext old_context;
/* Discard buffered data */
pool_reset_memqcache_buffer(true);
}
+ else if (partial_fetch) /* cannot create cache because of partial fetch */
+ {
+ /* Discard buffered data */
+ pool_reset_memqcache_buffer(true);
+ }
+
else if (is_commit_query(node)) /* Commit? */
{
int num_caches;
--- /dev/null
+FE=> Parse(stmt="", query="SELECT * FROM (VALUES(1),(2)) v")
+FE=> Bind(stmt="", portal="")
+FE=> Describe(portal="")
+FE=> Execute(portal="")
+FE=> Execute(portal="")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE PortalSuspended
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="", query="SELECT * FROM (VALUES(1),(2)) v")
+FE=> Bind(stmt="", portal="")
+FE=> Describe(portal="")
+FE=> Execute(portal="")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="S1", query="SELECT * FROM (VALUES(1),(2)) v")
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE ReadyForQuery(I)
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Sync
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE CommandComplete(SELECT 0)
+<= BE ReadyForQuery(I)
+FE=> Terminate
--- /dev/null
+# Since the first Eexecute returns portal suspended, the second
+# E does not create cache.
+'P' "" "SELECT * FROM (VALUES(1),(2)) v" 0
+'B' "" "" 0 0 0
+'D' 'P' ""
+'E' "" 1
+'E' "" 0
+'S'
+'Y'
+
+# Since no cache was created, two rows are returned,
+# and cache created.
+'P' "" "SELECT * FROM (VALUES(1),(2)) v" 0
+'B' "" "" 0 0 0
+'D' 'P' ""
+'E' "" 0
+'S'
+'Y'
+
+# This time cache is used. 2 rows are returned.
+'P' "S1" "SELECT * FROM (VALUES(1),(2)) v" 0
+'B' "P1" "S1" 0 0 0
+'D' 'P' "P1"
+'E' "P1" 0
+'S'
+'Y'
+
+# cache is used, 2 rows are returned.
+# but the second E returns 0 rows.
+'B' "P1" "S1" 0 0 0
+'D' 'P' "P1"
+'E' "P1" 0
+'E' "P1" 0
+'S'
+'Y'
+
+'X'
echo "memory_cache_enabled = on" >> etc/pgpool.conf
cd ..
-for i in 1 2 3
+for i in 1 2 3 4 4
do
#
# case 1: failed with kind mismatch error at #5.
# case 2: step #4 includes error (hung).
#
# case 3: step #4 includes PortalSuspended (hung).
+ #
+ # case 4: various cases including portal suspended
+ # Note that case4 is executed twice to make sure that
+ # the test works for either query cache exists or does not exist
cd $TESTDIR
./startall
wait_for_pgpool_startup