int i = 0;
bool is_prepared_stmt = false;
POOL_SESSION_CONTEXT *session_context;
- POOL_QUERY_CONTEXT *query_context;
POOL_PENDING_MESSAGE *msg;
+ int num_msgs;
+ int msg_cnt = 0;
session_context = pool_get_session_context(false);
- query_context = session_context->query_context;
- msg = pool_pending_message_find_lastest_by_query_context(query_context);
+ msg = pool_pending_message_head_message();
+ num_msgs = list_length(session_context->pending_messages);
if (msg)
{
/*
- * If pending message found, we should extract target backend from it
+ * If pending message found, we should extract data from the target
+ * backend.
*/
int backend_id;
backend_id = pool_pending_message_get_target_backend_id(msg);
+ pool_pending_message_free_pending_message(msg);
backend = CONNECTION(session_context->backend, backend_id);
timeout = -1;
}
pool_set_timeout(-1);
}
+ /* read one message from backend */
pool_read(backend, &kind, 1);
- ereport(DEBUG1,
- (errmsg("inject_cached_message: push message kind: '%c'", kind)));
- if (msg &&
- ((kind == 'T' && msg->type == POOL_DESCRIBE) ||
- (kind == '2' && msg->type == POOL_BIND)))
+
+ /*
+ * Count up number of received messages to compare with the number of
+ * pending messages
+ */
+ switch(kind)
+ {
+ case '1': /* parse complete */
+ case '2': /* bind complete */
+ case '3': /* close complete */
+ case 'C': /* command complete */
+ case 's': /* portal suspended */
+ case 'T': /* row description */
+ msg_cnt++; /* count up number of messages */
+ elog(DEBUG1, "count up message %c msg_cnt: %d", kind, msg_cnt);
+ break;
+ case 'E': /* ErrorResponse */
+ /*
+ * If we receive ErrorResponse, it is likely that the last
+ * Execute caused an error and we can stop reading messsages
+ * from backend.
+ */
+ timeout = 0;
+ break;
+ default:
+ /* we do not count other messages */
+ break;
+ }
+
+ /*
+ * If msg count is greater than or equal to the number of pending
+ * messages, it is likely all necessary backend messages have been
+ * already seen.
+ */
+ if (msg_cnt >= num_msgs)
{
- /* Pending message seen. Now it is likely to end of pending data */
+ /*
+ * Set timeout to 0 so that we do not need to wait for responses
+ * from backend in vain.
+ */
timeout = 0;
+ elog(DEBUG1, "num_msgs: %d msg_cnt: %d", num_msgs, msg_cnt);
}
pool_push(backend, &kind, sizeof(kind));
pool_read(backend, &len, sizeof(len));
--- /dev/null
+FE=> Parse(stmt="S0", query="SELECT 1")
+FE=> Bind(stmt="S0", portal="P0")
+FE=> Describe(portal="P0")
+FE=> Execute(portal="P0")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="S1", query="SELECT 2")
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Bind(stmt="S0", portal="P2")
+FE=> Describe(portal="P2")
+FE=> Execute(portal="P2")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE ReadyForQuery(I)
+FE=> Terminate
--- /dev/null
+FE=> Parse(stmt="S0", query="SELECT 1")
+FE=> Bind(stmt="S0", portal="P0")
+FE=> Describe(portal="P0")
+FE=> Execute(portal="P0")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE CommandComplete(SELECT 1)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="S1", query="SELECT setseed(10)")
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Bind(stmt="S0", portal="P2")
+FE=> Describe(portal="P2")
+FE=> Execute(portal="P2")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE ErrorResponse(S ERROR V ERROR C 22023 M setseed parameter 10 is out of allowed range [-1,1]
+<= BE ReadyForQuery(I)
+FE=> Terminate
--- /dev/null
+FE=> Parse(stmt="S0", query="SELECT * FROM (VALUES(1),(2))")
+FE=> Bind(stmt="S0", portal="P0")
+FE=> Describe(portal="P0")
+FE=> Execute(portal="P0")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE ReadyForQuery(I)
+FE=> Parse(stmt="S1", query="SELECT * FROM (VALUES(1),(2),(3))")
+FE=> Bind(stmt="S1", portal="P1")
+FE=> Describe(portal="P1")
+FE=> Execute(portal="P1")
+FE=> Bind(stmt="S0", portal="P2")
+FE=> Describe(portal="P2")
+FE=> Execute(portal="P2")
+FE=> Sync
+<= BE ParseComplete
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE PortalSuspended
+<= BE BindComplete
+<= BE RowDescription
+<= BE DataRow
+<= BE DataRow
+<= BE CommandComplete(SELECT 2)
+<= BE ReadyForQuery(I)
+FE=> Terminate
--- /dev/null
+# "packet kind of backend 0 ['T'] does not match with main/majority nodes packet kind ['Z']"
+'P' "S0" "SELECT 1" 0
+'B' "P0" "S0" 0 0 0
+'D' 'P' "P0"
+'E' "P0" 0
+'S'
+'Y'
+
+'P' "S1" "SELECT 2" 0
+'B' "P1" "S1" 0 0 0
+'D' 'P' "P1"
+'E' "P1" 0
+
+'B' "P2" "S0" 0 0 0
+'D' 'P' "P2"
+'E' "P2" 0
+
+'S'
+'Y'
+
+'X'
--- /dev/null
+# test case including error response
+
+'P' "S0" "SELECT 1" 0
+'B' "P0" "S0" 0 0 0
+'D' 'P' "P0"
+'E' "P0" 0
+'S'
+'Y'
+
+# setseed parameter out of range error
+'P' "S1" "SELECT setseed(10)" 0
+'B' "P1" "S1" 0 0 0
+'D' 'P' "P1"
+'E' "P1" 0
+
+'B' "P2" "S0" 0 0 0
+'D' 'P' "P2"
+'E' "P2" 0
+
+'S'
+'Y'
+
+'X'
--- /dev/null
+# Portal suspended test
+
+'P' "S0" "SELECT * FROM (VALUES(1),(2))" 0
+'B' "P0" "S0" 0 0 0
+'D' 'P' "P0"
+'E' "P0" 0
+'S'
+'Y'
+
+'P' "S1" "SELECT * FROM (VALUES(1),(2),(3))" 0
+'B' "P1" "S1" 0 0 0
+'D' 'P' "P1"
+'E' "P1" 1
+
+'B' "P2" "S0" 0 0 0
+'D' 'P' "P2"
+'E' "P2" 0
+'S'
+'Y'
+
+'X'
cd ..
done
+#
+# Test for extended query protocol coner cases in streaming replication mode.
+# These tests are basically for a sequence of extended queries:
+# 1. execute a SELECT and create query cache entry.
+# 2. sync.
+# 3. execute another a SELECT.
+# 4. execute bind and execute to use the query cache created at #1.
+# 5. sync.
+
+rm -fr $TESTDIR
+mkdir $TESTDIR
+cd $TESTDIR
+
+# create test environment
+echo -n "creating test environment..."
+$PGPOOL_SETUP -m s -n 2 || exit 1
+echo "done."
+
+echo "memory_cache_enabled = on" >> etc/pgpool.conf
+cd ..
+
+for i in 1 2 3
+do
+ #
+ # case 1: failed with kind mismatch error at #5.
+ # "packet kind of backend 0 ['T'] does not match with main/majority nodes packet kind ['Z']"
+ #
+ # case 2: step #4 includes error (hung).
+ #
+ # case 3: step #4 includes PortalSuspended (hung).
+ cd $TESTDIR
+ ./startall
+ wait_for_pgpool_startup
+ timeout 1 $PGPROTO -d test -f ../query_cache_bug$i.data |& del_details_from_error > result
+ if [ $? != 0 ];then
+ # timeout happened or pgproto returned non 0 status
+ echo "test failed in test case #2 (timeout)"
+ err=true
+ ./shutdownall
+ exit 1
+ fi
+ ./shutdownall
+ cd ..
+ diff -c expected.$i $TESTDIR/result > $log
+ if [ $? != 0 ];then
+ echo "test failed in test case $i"
+ cat $log
+ rm $log
+ exit 1
+ fi
+done
+
exit 0