summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/pgxc/pool/execRemote.c116
1 files changed, 29 insertions, 87 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 39a2502298..03f1752347 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -145,7 +145,7 @@ static bool pgxc_start_command_on_connection(PGXCNodeHandle *connection,
static TupleTableSlot * RemoteQueryNext(RemoteQueryState *node);
static char *generate_begin_command(void);
-static bool pgxc_node_remote_prepare(char *prepareGID, bool implicit);
+static bool pgxc_node_remote_prepare(char *prepareGID);
static void pgxc_node_remote_commit(void);
static void pgxc_node_remote_abort(void);
static char *pgxc_node_get_nodelist(bool localNode);
@@ -1556,34 +1556,20 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections,
* prepareGID is created and passed from xact.c
*/
static bool
-pgxc_node_remote_prepare(char *prepareGID, bool implicit)
+pgxc_node_remote_prepare(char *prepareGID)
{
- int result = 0;
- int write_conn_count = remoteXactState.numWriteRemoteNodes;
- int read_conn_count = remoteXactState.numReadRemoteNodes;
+ int result = 0;
+ int write_conn_count = remoteXactState.numWriteRemoteNodes;
char prepare_cmd[256];
- char commit_cmd[256];
- char *cmd_to_send;
- int i;
- PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles;
- RemoteQueryState *combiner = NULL;
- RemoteXactNodeStatus rxns;
- RemoteXactNodeStatus rxns_ok;
- RemoteXactStatus rxs;
-
- /* In case of implicit prepare nothing to do on read only nodes */
- if (implicit)
- read_conn_count = 0;
+ int i;
+ PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles;
+ RemoteQueryState *combiner = NULL;
- /*
- * Was begin sent to any node?
- * if no, no point sending either prepare transaction
- * or commit transaction to any of them
- * Probably user did a begin and immediately followed it by
- * a prepare transaction
- * Return also if the caller does not want to run 2PC protocol
+ /*
+ * If there is NO write activity or the caller does not want us to run a
+ * 2PC protocol, we don't need to do anything special
*/
- if (((read_conn_count + write_conn_count) == 0) || prepareGID == NULL)
+ if ((write_conn_count == 0) || (prepareGID == NULL))
return false;
/* Save the prepareGID in the global state information */
@@ -1592,33 +1578,8 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit)
/* Generate the PREPARE TRANSACTION command */
sprintf(prepare_cmd, "PREPARE TRANSACTION '%s'", remoteXactState.prepareGID);
- /*
- * Generate the COMMIT TRANSACTION command for nodes
- * where this transaction was not prepared because
- * nothing was written to them by this transaction
- * but transaction was started on those nodes
- * so it has to be ended gracefully
- */
- sprintf(commit_cmd, "COMMIT TRANSACTION");
-
- cmd_to_send = (char *)&prepare_cmd;
- rxns = RXACT_NODE_PREPARE_FAILED;
- rxs = RXACT_PREPARE_FAILED;
- rxns_ok = RXACT_NODE_PREPARE_SENT;
- for (i = 0; i < write_conn_count + read_conn_count; i++)
+ for (i = 0; i < write_conn_count; i++)
{
- /*
- * As long as we are dealing with write nodes send prepare transaction,
- * otherwise commit transaction
- */
- if (i >= write_conn_count)
- {
- cmd_to_send = (char *)&commit_cmd;
- rxns = RXACT_NODE_COMMIT_FAILED;
- rxs = RXACT_COMMIT_FAILED;
- rxns_ok = RXACT_NODE_COMMIT_SENT;
- }
-
/*
* PGXCTODO - We should actually make sure that the connection state is
* IDLE when we reach here. The executor should have guaranteed that
@@ -1635,10 +1596,10 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit)
* Now we are ready to PREPARE the transaction. Any error at this point
* can be safely ereport-ed and the transaction will be aborted.
*/
- if (pgxc_node_send_query(connections[i], cmd_to_send))
+ if (pgxc_node_send_query(connections[i], prepare_cmd))
{
- remoteXactState.remoteNodeStatus[i] = rxns;
- remoteXactState.status = rxs;
+ remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED;
+ remoteXactState.status = RXACT_PREPARE_FAILED;
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to send PREPARE TRANSACTION command to "
@@ -1646,7 +1607,7 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit)
}
else
{
- remoteXactState.remoteNodeStatus[i] = rxns_ok;
+ remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_SENT;
/* Let the HandleCommandComplete know response checking is enable */
connections[i]->ck_resp_rollback = RESP_ROLLBACK_CHECK;
}
@@ -1665,11 +1626,11 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit)
* good for parallel proessing, but I think we should have a leak-proof
* mechanism to track connection status
*/
- if (write_conn_count + read_conn_count)
+ if (write_conn_count)
{
- combiner = CreateResponseCombiner(write_conn_count + read_conn_count, COMBINE_TYPE_NONE);
+ combiner = CreateResponseCombiner(write_conn_count, COMBINE_TYPE_NONE);
/* Receive responses */
- result = pgxc_node_receive_responses(write_conn_count + read_conn_count, connections, NULL, combiner);
+ result = pgxc_node_receive_responses(write_conn_count, connections, NULL, combiner);
if (result || !validate_combiner(combiner))
result = EOF;
else
@@ -1678,46 +1639,28 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit)
combiner = NULL;
}
- rxns = RXACT_NODE_PREPARE_FAILED;
- rxs = RXACT_PREPARE_FAILED;
- rxns_ok = RXACT_NODE_PREPARED;
- for (i = 0; i < write_conn_count + read_conn_count; i++)
+ for (i = 0; i < write_conn_count; i++)
{
- /*
- * As long as we are dealing with write nodes error status
- * will be for prepare transaction, otherwise commit transaction
- */
- if (i >= write_conn_count)
- {
- rxns = RXACT_NODE_COMMIT_FAILED;
- rxs = RXACT_COMMIT_FAILED;
- rxns_ok = RXACT_NODE_COMMITTED;
- }
-
- if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT ||
- remoteXactState.remoteNodeStatus[i] == RXACT_NODE_COMMIT_SENT)
+ if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT)
{
if (connections[i]->error)
{
- remoteXactState.remoteNodeStatus[i] = rxns;
- remoteXactState.status = rxs;
+ remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED;
+ remoteXactState.status = RXACT_PREPARE_FAILED;
}
else
{
/* Did we receive ROLLBACK in response to PREPARE TRANSCATION? */
if (connections[i]->ck_resp_rollback == RESP_ROLLBACK_RECEIVED)
{
- /*
- * If yes, it means PREPARE TRANSACTION or COMMIT TRANSACTION failed
- * report error accordingly
- */
- remoteXactState.remoteNodeStatus[i] = rxns;
- remoteXactState.status = rxs;
+ /* If yes, it means PREPARE TRANSACTION failed */
+ remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED;
+ remoteXactState.status = RXACT_PREPARE_FAILED;
result = 0;
}
else
{
- remoteXactState.remoteNodeStatus[i] = rxns_ok;
+ remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARED;
}
}
}
@@ -1737,8 +1680,7 @@ pgxc_node_remote_prepare(char *prepareGID, bool implicit)
elog(ERROR, "failed to PREPARE transaction on one or more nodes");
}
- if (remoteXactState.status == RXACT_PREPARE_FAILED ||
- remoteXactState.status == RXACT_COMMIT_FAILED )
+ if (remoteXactState.status == RXACT_PREPARE_FAILED)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to PREPARE the transaction on one or more nodes")));
@@ -4400,7 +4342,7 @@ PrePrepare_Remote(char *prepareGID, bool localNode, bool implicit)
* local node. Any errors will be reported via ereport and the transaction
* will be aborted accordingly.
*/
- pgxc_node_remote_prepare(prepareGID, implicit);
+ pgxc_node_remote_prepare(prepareGID);
if (preparedNodes)
pfree(preparedNodes);