summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/pgxc/pool/execRemote.c50
1 files changed, 41 insertions, 9 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 08eac2faaf..ef37f1a6f4 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -1447,9 +1447,11 @@ BufferConnection(PGXCNodeHandle *conn)
* by the datanode
*/
if (pgxc_node_send_sync(conn) != 0)
+ {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to sync msg to node %u", conn->nodeoid)));
+ }
}
}
else if (res == RESPONSE_SUSPENDED || res == RESPONSE_READY)
@@ -1800,19 +1802,30 @@ FetchTuple(ResponseCombiner *combiner)
* executed there yet. Return and go on with second phase.
*/
if (combiner->probing_primary)
+ {
return NULL;
+ }
+
if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0)
+ {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid)));
+ }
+
if (pgxc_node_send_flush(conn) != 0)
+ {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid)));
+ }
+
if (pgxc_node_receive(1, &conn, NULL))
+ {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed receive data from node %u cursor '%s'", conn->nodeoid, combiner->cursor)));
+ }
}
/* read messages */
@@ -1829,7 +1842,7 @@ FetchTuple(ResponseCombiner *combiner)
if (pgxc_node_receive(1, &conn, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to receive more data from data node %u", conn->nodeoid)));
continue;
}
else if (res == RESPONSE_SUSPENDED)
@@ -1844,17 +1857,36 @@ FetchTuple(ResponseCombiner *combiner)
if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid)));
if (pgxc_node_send_flush(conn) != 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid)));
if (pgxc_node_receive(1, &conn, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed receive node from node %u cursor '%s'", conn->nodeoid, combiner->cursor)));
continue;
}
+
+ /*
+ * Tell the node to fetch data in background, next loop when we
+ * pgxc_node_receive, data is already there, so we can run faster
+ * */
+ if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid)));
+ }
+
+ if (pgxc_node_send_flush(conn) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid)));
+ }
+
if (++combiner->current_conn >= combiner->conn_count)
combiner->current_conn = 0;
conn = combiner->connections[combiner->current_conn];
@@ -1894,7 +1926,7 @@ FetchTuple(ResponseCombiner *combiner)
if (pgxc_node_send_sync(conn) != 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to sync msg to node %u", conn->nodeoid)));
}
/*
* Do not wait for response from primary, it needs to wait