diff options
| author | Mason Sharp | 2014-11-14 15:37:03 +0000 |
|---|---|---|
| committer | Pavan Deolasee | 2015-04-15 05:46:40 +0000 |
| commit | c7321ce8157a9029a95eabc98807776d32cd6ac5 (patch) | |
| tree | 6cf6a7e225ab612e38935568c2441ca3970b4ccd /src | |
| parent | df34d71b549e92050ad39dbf208fc0f5e626b948 (diff) | |
When fetching from remote nodes, after obtaining initial
group of rows, send down to the node that it should start producing
the next batch so it is ready when we are ready to read them.
Patch submitted by Jason
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 50 |
1 files changed, 41 insertions, 9 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 08eac2faaf..ef37f1a6f4 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -1447,9 +1447,11 @@ BufferConnection(PGXCNodeHandle *conn) * by the datanode */ if (pgxc_node_send_sync(conn) != 0) + { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to sync msg to node %u", conn->nodeoid))); + } } } else if (res == RESPONSE_SUSPENDED || res == RESPONSE_READY) @@ -1800,19 +1802,30 @@ FetchTuple(ResponseCombiner *combiner) * executed there yet. Return and go on with second phase. */ if (combiner->probing_primary) + { return NULL; + } + if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0) + { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid))); + } + if (pgxc_node_send_flush(conn) != 0) + { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid))); + } + if (pgxc_node_receive(1, &conn, NULL)) + { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed receive data from node %u cursor '%s'", conn->nodeoid, combiner->cursor))); + } } /* read messages */ @@ -1829,7 +1842,7 @@ FetchTuple(ResponseCombiner *combiner) if (pgxc_node_receive(1, &conn, NULL)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to receive more data from data node %u", conn->nodeoid))); continue; } else if (res == RESPONSE_SUSPENDED) @@ -1844,17 +1857,36 @@ FetchTuple(ResponseCombiner *combiner) if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid))); if (pgxc_node_send_flush(conn) != 0) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid))); if (pgxc_node_receive(1, &conn, NULL)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed receive node from node %u cursor '%s'", conn->nodeoid, combiner->cursor))); continue; } + + /* + * Tell the node to fetch data in background, next loop when we + * pgxc_node_receive, data is already there, so we can run faster + * */ + if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid))); + } + + if (pgxc_node_send_flush(conn) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid))); + } + if (++combiner->current_conn >= combiner->conn_count) combiner->current_conn = 0; conn = combiner->connections[combiner->current_conn]; @@ -1894,7 +1926,7 @@ FetchTuple(ResponseCombiner *combiner) if (pgxc_node_send_sync(conn) != 0) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to sync msg to node %u", conn->nodeoid))); } /* * Do not wait for response from primary, it needs to wait |
