summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMason Sharp2014-11-14 15:37:03 +0000
committerPavan Deolasee2015-04-15 05:46:40 +0000
commitc7321ce8157a9029a95eabc98807776d32cd6ac5 (patch)
tree6cf6a7e225ab612e38935568c2441ca3970b4ccd /src
parentdf34d71b549e92050ad39dbf208fc0f5e626b948 (diff)
When fetching from remote nodes, after obtaining initial
group of rows, send down to the node that it should start producing the next batch so it is ready when we are ready to read them. Patch submitted by Jason
Diffstat (limited to 'src')
-rw-r--r--src/backend/pgxc/pool/execRemote.c50
1 files changed, 41 insertions, 9 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 08eac2faaf..ef37f1a6f4 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -1447,9 +1447,11 @@ BufferConnection(PGXCNodeHandle *conn)
* by the datanode
*/
if (pgxc_node_send_sync(conn) != 0)
+ {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to sync msg to node %u", conn->nodeoid)));
+ }
}
}
else if (res == RESPONSE_SUSPENDED || res == RESPONSE_READY)
@@ -1800,19 +1802,30 @@ FetchTuple(ResponseCombiner *combiner)
* executed there yet. Return and go on with second phase.
*/
if (combiner->probing_primary)
+ {
return NULL;
+ }
+
if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0)
+ {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid)));
+ }
+
if (pgxc_node_send_flush(conn) != 0)
+ {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid)));
+ }
+
if (pgxc_node_receive(1, &conn, NULL))
+ {
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed receive data from node %u cursor '%s'", conn->nodeoid, combiner->cursor)));
+ }
}
/* read messages */
@@ -1829,7 +1842,7 @@ FetchTuple(ResponseCombiner *combiner)
if (pgxc_node_receive(1, &conn, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to receive more data from data node %u", conn->nodeoid)));
continue;
}
else if (res == RESPONSE_SUSPENDED)
@@ -1844,17 +1857,36 @@ FetchTuple(ResponseCombiner *combiner)
if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid)));
if (pgxc_node_send_flush(conn) != 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid)));
if (pgxc_node_receive(1, &conn, NULL))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed receive node from node %u cursor '%s'", conn->nodeoid, combiner->cursor)));
continue;
}
+
+ /*
+ * Tell the node to fetch data in background, next loop when we
+ * pgxc_node_receive, data is already there, so we can run faster
+ * */
+ if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid)));
+ }
+
+ if (pgxc_node_send_flush(conn) != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid)));
+ }
+
if (++combiner->current_conn >= combiner->conn_count)
combiner->current_conn = 0;
conn = combiner->connections[combiner->current_conn];
@@ -1894,7 +1926,7 @@ FetchTuple(ResponseCombiner *combiner)
if (pgxc_node_send_sync(conn) != 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Failed to fetch from data node")));
+ errmsg("Failed to sync msg to node %u", conn->nodeoid)));
}
/*
* Do not wait for response from primary, it needs to wait