diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 116 |
1 files changed, 29 insertions, 87 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 39a2502298..03f1752347 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -145,7 +145,7 @@ static bool pgxc_start_command_on_connection(PGXCNodeHandle *connection, static TupleTableSlot * RemoteQueryNext(RemoteQueryState *node); static char *generate_begin_command(void); -static bool pgxc_node_remote_prepare(char *prepareGID, bool implicit); +static bool pgxc_node_remote_prepare(char *prepareGID); static void pgxc_node_remote_commit(void); static void pgxc_node_remote_abort(void); static char *pgxc_node_get_nodelist(bool localNode); @@ -1556,34 +1556,20 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections, * prepareGID is created and passed from xact.c */ static bool -pgxc_node_remote_prepare(char *prepareGID, bool implicit) +pgxc_node_remote_prepare(char *prepareGID) { - int result = 0; - int write_conn_count = remoteXactState.numWriteRemoteNodes; - int read_conn_count = remoteXactState.numReadRemoteNodes; + int result = 0; + int write_conn_count = remoteXactState.numWriteRemoteNodes; char prepare_cmd[256]; - char commit_cmd[256]; - char *cmd_to_send; - int i; - PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles; - RemoteQueryState *combiner = NULL; - RemoteXactNodeStatus rxns; - RemoteXactNodeStatus rxns_ok; - RemoteXactStatus rxs; - - /* In case of implicit prepare nothing to do on read only nodes */ - if (implicit) - read_conn_count = 0; + int i; + PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles; + RemoteQueryState *combiner = NULL; - /* - * Was begin sent to any node? - * if no, no point sending either prepare transaction - * or commit transaction to any of them - * Probably user did a begin and immediately followed it by - * a prepare transaction - * Return also if the caller does not want to run 2PC protocol + /* + * If there is NO write activity or the caller does not want us to run a + * 2PC protocol, we don't need to do anything special */ - if (((read_conn_count + write_conn_count) == 0) || prepareGID == NULL) + if ((write_conn_count == 0) || (prepareGID == NULL)) return false; /* Save the prepareGID in the global state information */ @@ -1592,33 +1578,8 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) /* Generate the PREPARE TRANSACTION command */ sprintf(prepare_cmd, "PREPARE TRANSACTION '%s'", remoteXactState.prepareGID); - /* - * Generate the COMMIT TRANSACTION command for nodes - * where this transaction was not prepared because - * nothing was written to them by this transaction - * but transaction was started on those nodes - * so it has to be ended gracefully - */ - sprintf(commit_cmd, "COMMIT TRANSACTION"); - - cmd_to_send = (char *)&prepare_cmd; - rxns = RXACT_NODE_PREPARE_FAILED; - rxs = RXACT_PREPARE_FAILED; - rxns_ok = RXACT_NODE_PREPARE_SENT; - for (i = 0; i < write_conn_count + read_conn_count; i++) + for (i = 0; i < write_conn_count; i++) { - /* - * As long as we are dealing with write nodes send prepare transaction, - * otherwise commit transaction - */ - if (i >= write_conn_count) - { - cmd_to_send = (char *)&commit_cmd; - rxns = RXACT_NODE_COMMIT_FAILED; - rxs = RXACT_COMMIT_FAILED; - rxns_ok = RXACT_NODE_COMMIT_SENT; - } - /* * PGXCTODO - We should actually make sure that the connection state is * IDLE when we reach here. The executor should have guaranteed that @@ -1635,10 +1596,10 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) * Now we are ready to PREPARE the transaction. Any error at this point * can be safely ereport-ed and the transaction will be aborted. */ - if (pgxc_node_send_query(connections[i], cmd_to_send)) + if (pgxc_node_send_query(connections[i], prepare_cmd)) { - remoteXactState.remoteNodeStatus[i] = rxns; - remoteXactState.status = rxs; + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; + remoteXactState.status = RXACT_PREPARE_FAILED; ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("failed to send PREPARE TRANSACTION command to " @@ -1646,7 +1607,7 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) } else { - remoteXactState.remoteNodeStatus[i] = rxns_ok; + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_SENT; /* Let the HandleCommandComplete know response checking is enable */ connections[i]->ck_resp_rollback = RESP_ROLLBACK_CHECK; } @@ -1665,11 +1626,11 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) * good for parallel proessing, but I think we should have a leak-proof * mechanism to track connection status */ - if (write_conn_count + read_conn_count) + if (write_conn_count) { - combiner = CreateResponseCombiner(write_conn_count + read_conn_count, COMBINE_TYPE_NONE); + combiner = CreateResponseCombiner(write_conn_count, COMBINE_TYPE_NONE); /* Receive responses */ - result = pgxc_node_receive_responses(write_conn_count + read_conn_count, connections, NULL, combiner); + result = pgxc_node_receive_responses(write_conn_count, connections, NULL, combiner); if (result || !validate_combiner(combiner)) result = EOF; else @@ -1678,46 +1639,28 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) combiner = NULL; } - rxns = RXACT_NODE_PREPARE_FAILED; - rxs = RXACT_PREPARE_FAILED; - rxns_ok = RXACT_NODE_PREPARED; - for (i = 0; i < write_conn_count + read_conn_count; i++) + for (i = 0; i < write_conn_count; i++) { - /* - * As long as we are dealing with write nodes error status - * will be for prepare transaction, otherwise commit transaction - */ - if (i >= write_conn_count) - { - rxns = RXACT_NODE_COMMIT_FAILED; - rxs = RXACT_COMMIT_FAILED; - rxns_ok = RXACT_NODE_COMMITTED; - } - - if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT || - remoteXactState.remoteNodeStatus[i] == RXACT_NODE_COMMIT_SENT) + if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT) { if (connections[i]->error) { - remoteXactState.remoteNodeStatus[i] = rxns; - remoteXactState.status = rxs; + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; + remoteXactState.status = RXACT_PREPARE_FAILED; } else { /* Did we receive ROLLBACK in response to PREPARE TRANSCATION? */ if (connections[i]->ck_resp_rollback == RESP_ROLLBACK_RECEIVED) { - /* - * If yes, it means PREPARE TRANSACTION or COMMIT TRANSACTION failed - * report error accordingly - */ - remoteXactState.remoteNodeStatus[i] = rxns; - remoteXactState.status = rxs; + /* If yes, it means PREPARE TRANSACTION failed */ + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; + remoteXactState.status = RXACT_PREPARE_FAILED; result = 0; } else { - remoteXactState.remoteNodeStatus[i] = rxns_ok; + remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARED; } } } @@ -1737,8 +1680,7 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit) elog(ERROR, "failed to PREPARE transaction on one or more nodes"); } - if (remoteXactState.status == RXACT_PREPARE_FAILED || - remoteXactState.status == RXACT_COMMIT_FAILED ) + if (remoteXactState.status == RXACT_PREPARE_FAILED) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to PREPARE the transaction on one or more nodes"))); @@ -4400,7 +4342,7 @@ PrePrepare_Remote(char *prepareGID, bool localNode, bool implicit) * local node. Any errors will be reported via ereport and the transaction * will be aborted accordingly. */ - pgxc_node_remote_prepare(prepareGID, implicit); + pgxc_node_remote_prepare(prepareGID); if (preparedNodes) pfree(preparedNodes); |
