diff options
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 50 |
1 files changed, 41 insertions, 9 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 08eac2faaf..ef37f1a6f4 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -1447,9 +1447,11 @@ BufferConnection(PGXCNodeHandle *conn) * by the datanode */ if (pgxc_node_send_sync(conn) != 0) + { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to sync msg to node %u", conn->nodeoid))); + } } } else if (res == RESPONSE_SUSPENDED || res == RESPONSE_READY) @@ -1800,19 +1802,30 @@ FetchTuple(ResponseCombiner *combiner) * executed there yet. Return and go on with second phase. */ if (combiner->probing_primary) + { return NULL; + } + if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0) + { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid))); + } + if (pgxc_node_send_flush(conn) != 0) + { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid))); + } + if (pgxc_node_receive(1, &conn, NULL)) + { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed receive data from node %u cursor '%s'", conn->nodeoid, combiner->cursor))); + } } /* read messages */ @@ -1829,7 +1842,7 @@ FetchTuple(ResponseCombiner *combiner) if (pgxc_node_receive(1, &conn, NULL)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to receive more data from data node %u", conn->nodeoid))); continue; } else if (res == RESPONSE_SUSPENDED) @@ -1844,17 +1857,36 @@ FetchTuple(ResponseCombiner *combiner) if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid))); if (pgxc_node_send_flush(conn) != 0) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid))); if (pgxc_node_receive(1, &conn, NULL)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed receive node from node %u cursor '%s'", conn->nodeoid, combiner->cursor))); continue; } + + /* + * Tell the node to fetch data in background, next loop when we + * pgxc_node_receive, data is already there, so we can run faster + * */ + if (pgxc_node_send_execute(conn, combiner->cursor, 1000) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed to send execute cursor '%s' to node %u", combiner->cursor, conn->nodeoid))); + } + + if (pgxc_node_send_flush(conn) != 0) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Failed flush cursor '%s' node %u", combiner->cursor, conn->nodeoid))); + } + if (++combiner->current_conn >= combiner->conn_count) combiner->current_conn = 0; conn = combiner->connections[combiner->current_conn]; @@ -1894,7 +1926,7 @@ FetchTuple(ResponseCombiner *combiner) if (pgxc_node_send_sync(conn) != 0) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("Failed to fetch from data node"))); + errmsg("Failed to sync msg to node %u", conn->nodeoid))); } /* * Do not wait for response from primary, it needs to wait |
