diff options
| author | Michael Paquier | 2012-05-28 06:09:12 +0000 |
|---|---|---|
| committer | Michael Paquier | 2012-05-28 06:19:16 +0000 |
| commit | 50939f47419fa619a6abad2c949aab7b71e69778 (patch) | |
| tree | e78e791458af73d13060383e7e741cb62d5abd8c /src | |
| parent | e66ae01f07e9b70372459a5f197e35b86c22cf62 (diff) | |
Simplify remote node communication at PREPARE phase
This commit fixes issue 3528952, which made a Coordinator sending
COMMIT messages unnecessarily to remote nodes for read connections.
This caused Datanodes to log a lot of warning messages of the type
"WARNING: there is no transaction in progress". This issue was
reproducible when cluster was under a huge load.
Things are simplified in pgxc_node_remote_prepare and this function
does not manage anymore with COMMIT messages, what it was not meant
to do by the way.
The modifications of this commit revert partially things implemented
in commit 077b37369a4b26b5aa0db7b23667a1803e2af4de but does not
change the process of SELECT FOR UPDATE transactions as connections
using such queries are registed as write connections to remote nodes.
All the test cases introduced by commits 077b373 and 9e66733 are kept.
They pass correctly with this patch, and measurements with DBT-1 are
passing correctly.
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 116 |
1 files changed, 29 insertions, 87 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 39a2502298..03f1752347 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -145,7 +145,7 @@ static bool pgxc_start_command_on_connection(PGXCNodeHandle *connection, static TupleTableSlot * RemoteQueryNext(RemoteQueryState *node); static char *generate_begin_command(void); -static bool pgxc_node_remote_prepare(char *prepareGID, bool implicit); +static bool pgxc_node_remote_prepare(char *prepareGID); static void pgxc_node_remote_commit(void); static void pgxc_node_remote_abort(void); static char *pgxc_node_get_nodelist(bool localNode); @@ -1556,34 +1556,20 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections, * prepareGID is created and passed from xact.c */ static bool -pgxc_node_remote_prepare(char *prepareGID, bool implicit) +pgxc_node_remote_prepare(char *prepareGID) { - int result = 0; - int write_conn_count = remoteXactState.numWriteRemoteNodes; - int read_conn_count = remoteXactState.numReadRemoteNodes; + int result = 0; + int write_conn_count = remoteXactState.numWriteRemoteNodes; char prepare_cmd[256]; - char commit_cmd[256]; - char *cmd_to_send; - int i; - PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles; - RemoteQueryState *combiner = NULL; - RemoteXactNodeStatus rxns; - RemoteXactNodeStatus rxns_ok; - RemoteXactStatus rxs; - - /* In case of implicit prepare nothing to do on read only nodes */ - if (implicit) - read_conn_count = 0; + int i; + PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles; + RemoteQueryState *combiner = NULL; - /* - * Was begin sent to any node? - * if no, no point sending either prepare transaction - * or commit transaction to any of them - * Probably user did a begin and immediately followed it by - * a prepare transaction - * Return also if the caller does not want to run 2PC protocol + /* + * If there is NO write activity or the caller does not want us to run a + * 2PC protocol, we don't need to do anything special */ - if (((read_conn_count + write_conn_count) == 0) || prepareGID == NULL) + if ((write_conn_count == 0) || (prepareGID == NULL)) return false; /* Save the prepareGID in the global state information */ @@ -1592,33 +1578,8 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) /* Generate the PREPARE TRANSACTION command */ sprintf(prepare_cmd, "PREPARE TRANSACTION '%s'", remoteXactState.prepareGID); - /* - * Generate the COMMIT TRANSACTION command for nodes - * where this transaction was not prepared because - * nothing was written to them by this transaction - * but transaction was started on those nodes - * so it has to be ended gracefully - */ - sprintf(commit_cmd, "COMMIT TRANSACTION"); - - cmd_to_send = (char *)&prepare_cmd; - rxns = RXACT_NODE_PREPARE_FAILED; - rxs = RXACT_PREPARE_FAILED; - rxns_ok = RXACT_NODE_PREPARE_SENT; - for (i = 0; i < write_conn_count + read_conn_count; i++) + for (i = 0; i < write_conn_count; i++) { - /* - * As long as we are dealing with write nodes send prepare transaction, - * otherwise commit transaction - */ - if (i >= write_conn_count) - { - cmd_to_send = (char *)&commit_cmd; - rxns = RXACT_NODE_COMMIT_FAILED; - rxs = RXACT_COMMIT_FAILED; - rxns_ok = RXACT_NODE_COMMIT_SENT; - } - /* * PGXCTODO - We should actually make sure that the connection state is * IDLE when we reach here. The executor should have guaranteed that @@ -1635,10 +1596,10 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) * Now we are ready to PREPARE the transaction. Any error at this point * can be safely ereport-ed and the transaction will be aborted. */ - if (pgxc_node_send_query(connections[i], cmd_to_send)) + if (pgxc_node_send_query(connections[i], prepare_cmd)) { - remoteXactState.remoteNodeStatus[i] = rxns; - remoteXactState.status = rxs; + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; + remoteXactState.status = RXACT_PREPARE_FAILED; ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("failed to send PREPARE TRANSACTION command to " @@ -1646,7 +1607,7 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) } else { - remoteXactState.remoteNodeStatus[i] = rxns_ok; + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_SENT; /* Let the HandleCommandComplete know response checking is enable */ connections[i]->ck_resp_rollback = RESP_ROLLBACK_CHECK; } @@ -1665,11 +1626,11 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) * good for parallel proessing, but I think we should have a leak-proof * mechanism to track connection status */ - if (write_conn_count + read_conn_count) + if (write_conn_count) { - combiner = CreateResponseCombiner(write_conn_count + read_conn_count, COMBINE_TYPE_NONE); + combiner = CreateResponseCombiner(write_conn_count, COMBINE_TYPE_NONE); /* Receive responses */ - result = pgxc_node_receive_responses(write_conn_count + read_conn_count, connections, NULL, combiner); + result = pgxc_node_receive_responses(write_conn_count, connections, NULL, combiner); if (result || !validate_combiner(combiner)) result = EOF; else @@ -1678,46 +1639,28 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) combiner = NULL; } - rxns = RXACT_NODE_PREPARE_FAILED; - rxs = RXACT_PREPARE_FAILED; - rxns_ok = RXACT_NODE_PREPARED; - for (i = 0; i < write_conn_count + read_conn_count; i++) + for (i = 0; i < write_conn_count; i++) { - /* - * As long as we are dealing with write nodes error status - * will be for prepare transaction, otherwise commit transaction - */ - if (i >= write_conn_count) - { - rxns = RXACT_NODE_COMMIT_FAILED; - rxs = RXACT_COMMIT_FAILED; - rxns_ok = RXACT_NODE_COMMITTED; - } - - if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT || - remoteXactState.remoteNodeStatus[i] == RXACT_NODE_COMMIT_SENT) + if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT) { if (connections[i]->error) { - remoteXactState.remoteNodeStatus[i] = rxns; - remoteXactState.status = rxs; + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; + remoteXactState.status = RXACT_PREPARE_FAILED; } else { /* Did we receive ROLLBACK in response to PREPARE TRANSCATION? */ if (connections[i]->ck_resp_rollback == RESP_ROLLBACK_RECEIVED) { - /* - * If yes, it means PREPARE TRANSACTION or COMMIT TRANSACTION failed - * report error accordingly - */ - remoteXactState.remoteNodeStatus[i] = rxns; - remoteXactState.status = rxs; + /* If yes, it means PREPARE TRANSACTION failed */ + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; + remoteXactState.status = RXACT_PREPARE_FAILED; result = 0; } else { - remoteXactState.remoteNodeStatus[i] = rxns_ok; + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARED; } } } @@ -1737,8 +1680,7 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) elog(ERROR, "failed to PREPARE transaction on one or more nodes"); } - if (remoteXactState.status == RXACT_PREPARE_FAILED || - remoteXactState.status == RXACT_COMMIT_FAILED ) + if (remoteXactState.status == RXACT_PREPARE_FAILED) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to PREPARE the transaction on one or more nodes"))); @@ -4400,7 +4342,7 @@ PrePrepare_Remote(char *prepareGID, bool localNode, bool implicit) * local node. Any errors will be reported via ereport and the transaction * will be aborted accordingly. */ - pgxc_node_remote_prepare(prepareGID, implicit); + pgxc_node_remote_prepare(prepareGID); if (preparedNodes) pfree(preparedNodes); |
