diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/optimizer/plan/createplan.c | 2 | ||||
-rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 116 | ||||
-rw-r--r-- | src/include/pgxc/planner.h | 60 | ||||
-rw-r--r-- | src/test/regress/expected/xc_for_update.out | 3 | ||||
-rw-r--r-- | src/test/regress/expected/xc_prepared_xacts.out | 457 | ||||
-rw-r--r-- | src/test/regress/parallel_schedule | 3 | ||||
-rw-r--r-- | src/test/regress/serial_schedule | 1 | ||||
-rw-r--r-- | src/test/regress/sql/xc_for_update.sql | 2 | ||||
-rw-r--r-- | src/test/regress/sql/xc_prepared_xacts.sql | 262 |
9 files changed, 844 insertions, 62 deletions
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index f5939c2597..2014a73b9e 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -2773,6 +2773,8 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path, /* Track if the remote query involves a temporary object */ scan_plan->is_temp = IsTempTable(rte->relid); + scan_plan->read_only = query->commandType == CMD_SELECT; + scan_plan->sql_statement = sql.data; /* * If the table distributed by value, check if we can reduce the datanodes diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index 67fa618f05..bdf174440c 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -145,7 +145,7 @@ static bool pgxc_start_command_on_connection(PGXCNodeHandle *connection, static TupleTableSlot * RemoteQueryNext(RemoteQueryState *node); static char *generate_begin_command(void); -static bool pgxc_node_remote_prepare(char *prepareGID); +static bool pgxc_node_remote_prepare(char *prepareGID, bool implicit); static void pgxc_node_remote_commit(void); static void pgxc_node_remote_abort(void); static char *pgxc_node_get_nodelist(bool localNode); @@ -1556,20 +1556,34 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections, * prepareGID is created and passed from xact.c */ static bool -pgxc_node_remote_prepare(char *prepareGID) +pgxc_node_remote_prepare(char *prepareGID, bool implicit) { - int result = 0; - int write_conn_count = remoteXactState.numWriteRemoteNodes; + int result = 0; + int write_conn_count = remoteXactState.numWriteRemoteNodes; + int read_conn_count = remoteXactState.numReadRemoteNodes; char prepare_cmd[256]; - int i; - PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles; - RemoteQueryState *combiner = NULL; + char commit_cmd[256]; + char *cmd_to_send; + int i; + PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles; + RemoteQueryState *combiner = NULL; + RemoteXactNodeStatus rxns; + RemoteXactNodeStatus rxns_ok; + RemoteXactStatus rxs; - /* - * If there is NO write activity or the caller does not want us to run a - * 2PC protocol, we don't need to do anything special + /* In case of implicit prepare nothing to do on read only nodes */ + if (implicit) + read_conn_count = 0; + + /* + * Was begin sent to any node? + * if no, no point sending either prepare transaction + * or commit transaction to any of them + * Probably user did a begin and immediately followed it by + * a prepare transaction + * Return also if the caller does not want to run 2PC protocol */ - if ((write_conn_count == 0) || (prepareGID == NULL)) + if (((read_conn_count + write_conn_count) == 0) || prepareGID == NULL) return false; /* Save the prepareGID in the global state information */ @@ -1578,8 +1592,33 @@ pgxc_node_remote_prepare(char *prepareGID) /* Generate the PREPARE TRANSACTION command */ sprintf(prepare_cmd, "PREPARE TRANSACTION '%s'", remoteXactState.prepareGID); - for (i = 0; i < write_conn_count; i++) + /* + * Generate the COMMIT TRANSACTION command for nodes + * where this transaction was not prepared because + * nothing was written to them by this transaction + * but transaction was started on those nodes + * so it has to be ended gracefully + */ + sprintf(commit_cmd, "COMMIT TRANSACTION"); + + cmd_to_send = (char *)&prepare_cmd; + rxns = RXACT_NODE_PREPARE_FAILED; + rxs = RXACT_PREPARE_FAILED; + rxns_ok = RXACT_NODE_PREPARE_SENT; + for (i = 0; i < write_conn_count + read_conn_count; i++) { + /* + * As long as we are dealing with write nodes send prepare transaction, + * otherwise commit transaction + */ + if (i >= write_conn_count) + { + cmd_to_send = (char *)&commit_cmd; + rxns = RXACT_NODE_COMMIT_FAILED; + rxs = RXACT_COMMIT_FAILED; + rxns_ok = RXACT_NODE_COMMIT_SENT; + } + /* * PGXCTODO - We should actually make sure that the connection state is * IDLE when we reach here. The executor should have guaranteed that @@ -1596,10 +1635,10 @@ pgxc_node_remote_prepare(char *prepareGID) * Now we are ready to PREPARE the transaction. Any error at this point * can be safely ereport-ed and the transaction will be aborted. */ - if (pgxc_node_send_query(connections[i], prepare_cmd)) + if (pgxc_node_send_query(connections[i], cmd_to_send)) { - remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; - remoteXactState.status = RXACT_PREPARE_FAILED; + remoteXactState.remoteNodeStatus[i] = rxns; + remoteXactState.status = rxs; ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("failed to send PREPARE TRANSACTION command to " @@ -1607,7 +1646,7 @@ pgxc_node_remote_prepare(char *prepareGID) } else { - remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_SENT; + remoteXactState.remoteNodeStatus[i] = rxns_ok; /* Let the HandleCommandComplete know response checking is enable */ connections[i]->ck_resp_rollback = RESP_ROLLBACK_CHECK; } @@ -1626,11 +1665,11 @@ pgxc_node_remote_prepare(char *prepareGID) * good for parallel proessing, but I think we should have a leak-proof * mechanism to track connection status */ - if (write_conn_count) + if (write_conn_count + read_conn_count) { - combiner = CreateResponseCombiner(write_conn_count, COMBINE_TYPE_NONE); + combiner = CreateResponseCombiner(write_conn_count + read_conn_count, COMBINE_TYPE_NONE); /* Receive responses */ - result = pgxc_node_receive_responses(write_conn_count, connections, NULL, combiner); + result = pgxc_node_receive_responses(write_conn_count + read_conn_count, connections, NULL, combiner); if (result || !validate_combiner(combiner)) result = EOF; else @@ -1639,28 +1678,46 @@ pgxc_node_remote_prepare(char *prepareGID) combiner = NULL; } - for (i = 0; i < write_conn_count; i++) + rxns = RXACT_NODE_PREPARE_FAILED; + rxs = RXACT_PREPARE_FAILED; + rxns_ok = RXACT_NODE_PREPARED; + for (i = 0; i < write_conn_count + read_conn_count; i++) { - if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT) + /* + * As long as we are dealing with write nodes error status + * will be for prepare transaction, otherwise commit transaction + */ + if (i >= write_conn_count) + { + rxns = RXACT_NODE_COMMIT_FAILED; + rxs = RXACT_COMMIT_FAILED; + rxns_ok = RXACT_NODE_COMMITTED; + } + + if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT || + remoteXactState.remoteNodeStatus[i] == RXACT_NODE_COMMIT_SENT) { if (connections[i]->error) { - remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; - remoteXactState.status = RXACT_PREPARE_FAILED; + remoteXactState.remoteNodeStatus[i] = rxns; + remoteXactState.status = rxs; } else { /* Did we receive ROLLBACK in response to PREPARE TRANSCATION? */ if (connections[i]->ck_resp_rollback == RESP_ROLLBACK_RECEIVED) { - /* If yes, it means PREPARE TRANSACTION failed */ - remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED; - remoteXactState.status = RXACT_PREPARE_FAILED; + /* + * If yes, it means PREPARE TRANSACTION or COMMIT TRANSACTION failed + * report error accordingly + */ + remoteXactState.remoteNodeStatus[i] = rxns; + remoteXactState.status = rxs; result = 0; } else { - remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARED; + remoteXactState.remoteNodeStatus[i] = rxns_ok; } } } @@ -1680,7 +1737,8 @@ pgxc_node_remote_prepare(char *prepareGID) elog(ERROR, "failed to PREPARE transaction on one or more nodes"); } - if (remoteXactState.status == RXACT_PREPARE_FAILED) + if (remoteXactState.status == RXACT_PREPARE_FAILED || + remoteXactState.status == RXACT_COMMIT_FAILED ) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to PREPARE the transaction on one or more nodes"))); @@ -4342,7 +4400,7 @@ PrePrepare_Remote(char *prepareGID, bool localNode, bool implicit) * local node. Any errors will be reported via ereport and the transaction * will be aborted accordingly. */ - pgxc_node_remote_prepare(prepareGID); + pgxc_node_remote_prepare(prepareGID, implicit); if (preparedNodes) pfree(preparedNodes); diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h index c9310b4b0e..06f53e7ce8 100644 --- a/src/include/pgxc/planner.h +++ b/src/include/pgxc/planner.h @@ -80,36 +80,36 @@ typedef enum */ typedef struct { - Scan scan; - ExecDirectType exec_direct_type; /* track if remote query is execute direct and what type it is */ - char *sql_statement; - ExecNodes *exec_nodes; /* List of Datanodes where to launch query */ - CombineType combine_type; - SimpleSort *sort; - bool read_only; /* do not use 2PC when committing read only steps */ - bool force_autocommit; /* some commands like VACUUM require autocommit mode */ - char *statement; /* if specified use it as a PreparedStatement name on data nodes */ - char *cursor; /* if specified use it as a Portal name on data nodes */ - int num_params; /* number of parameters specified for Prepared statement */ - Oid *param_types; /* parameter types, this pointer is shared - * across all the RemoteQuery nodes in the - * plan. So, don't change this once set. - */ - RemoteQueryExecType exec_type; - bool is_temp; /* determine if this remote node is based - * on a temporary objects (no 2PC) */ - - int reduce_level; /* in case of reduced JOIN, it's level */ - List *base_tlist; /* in case of isReduced, the base tlist */ - char *outer_alias; - char *inner_alias; - int outer_reduce_level; - int inner_reduce_level; - Relids outer_relids; - Relids inner_relids; - char *inner_statement; - char *outer_statement; - char *join_condition; + Scan scan; + ExecDirectType exec_direct_type; /* track if remote query is execute direct and what type it is */ + char *sql_statement; + ExecNodes *exec_nodes; /* List of Datanodes where to launch query */ + CombineType combine_type; + SimpleSort *sort; + bool read_only; /* do not use 2PC when committing read only steps */ + bool force_autocommit; /* some commands like VACUUM require autocommit mode */ + char *statement; /* if specified use it as a PreparedStatement name on data nodes */ + char *cursor; /* if specified use it as a Portal name on data nodes */ + int num_params; /* number of parameters specified for Prepared statement */ + Oid *param_types; /* parameter types, this pointer is shared + * across all the RemoteQuery nodes in the + * plan. So, don't change this once set. + */ + RemoteQueryExecType exec_type; + bool is_temp; /* determine if this remote node is based + * on a temporary objects (no 2PC) */ + + int reduce_level; /* in case of reduced JOIN, it's level */ + List *base_tlist; /* in case of isReduced, the base tlist */ + char *outer_alias; + char *inner_alias; + int outer_reduce_level; + int inner_reduce_level; + Relids outer_relids; + Relids inner_relids; + char *inner_statement; + char *outer_statement; + char *join_condition; } RemoteQuery; /* diff --git a/src/test/regress/expected/xc_for_update.out b/src/test/regress/expected/xc_for_update.out index ba13280c32..d69ade5466 100644 --- a/src/test/regress/expected/xc_for_update.out +++ b/src/test/regress/expected/xc_for_update.out @@ -625,8 +625,7 @@ explain (num_nodes off, nodes off, verbose on) select * from c1 for update; -- drop objects created drop table c1; -drop table c1; -ERROR: table "c1" does not exist +drop table p1; drop view v1; drop table t1; drop table t2; diff --git a/src/test/regress/expected/xc_prepared_xacts.out b/src/test/regress/expected/xc_prepared_xacts.out new file mode 100644 index 0000000000..6ddb2e44f8 --- /dev/null +++ b/src/test/regress/expected/xc_prepared_xacts.out @@ -0,0 +1,457 @@ +-- A function to return data node name given a node number +create or replace function get_xc_node_name(node_num int) returns varchar language plpgsql as $$ +declare + r pgxc_node%rowtype; + node int; + nodenames_query varchar; +begin + nodenames_query := 'SELECT * FROM pgxc_node WHERE node_type = ''D'' ORDER BY xc_node_id'; + + node := 1; + for r in execute nodenames_query loop + if node = node_num THEN + RETURN r.node_name; + end if; + node := node + 1; + end loop; + RETURN 'NODE_?'; +end; +$$; +-- A function to check whether a certain transaction was prepared on a specific data node given its number +create or replace function is_prepared_on_node(txn_id varchar, nodenum int) returns bool language plpgsql as $$ +declare + nodename varchar; + qry varchar; + r pg_prepared_xacts%rowtype; +begin + nodename := (SELECT get_xc_node_name(nodenum)); + qry := 'execute direct on ' || nodename || ' ' || chr(39) || 'select * from pg_prepared_xacts' || chr(39); + + for r in execute qry loop + if r.gid = txn_id THEN + RETURN true; + end if; + end loop; + return false; +end; +$$; +set enable_fast_query_shipping=true; +-- Test to make sure prepared transactions are working as expected +-- If a transcation is preared and contains only a select, it should NOT be preapred on data nodes +-- create some tables +create table t1(val int, val2 int) DISTRIBUTE BY REPLICATION; +create table t2(val int, val2 int) DISTRIBUTE BY REPLICATION; +create table t3(val int, val2 int) DISTRIBUTE BY REPLICATION; +create table p1(a int, b int) DISTRIBUTE BY REPLICATION; +create table c1(d int, e int) inherits (p1) DISTRIBUTE BY REPLICATION; +-- insert some rows in them +insert into t1 values(1,11),(2,11); +insert into t2 values(3,11),(4,11); +insert into t3 values(5,11),(6,11); +insert into p1 values(55,66),(77,88); +insert into c1 values(111,222,333,444),(123,345,567,789); +-- **** +begin; + select * from t1 order by val; + val | val2 +-----+------ + 1 | 11 + 2 | 11 +(2 rows) + + select * from t2 order by val; + val | val2 +-----+------ + 3 | 11 + 4 | 11 +(2 rows) + + select * from t3 order by val; + val | val2 +-----+------ + 5 | 11 + 6 | 11 +(2 rows) + + select * from p1 order by a; + a | b +-----+----- + 55 | 66 + 77 | 88 + 111 | 222 + 123 | 345 +(4 rows) + + select * from c1 order by a; + a | b | d | e +-----+-----+-----+----- + 111 | 222 | 333 | 444 + 123 | 345 | 567 | 789 +(2 rows) + +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- false + is_prepared_on_node +--------------------- + f +(1 row) + +commit prepared 'pt_1'; +-- **** +begin; + insert into t3 values(7,11); + insert into t3 values(8,11); + insert into t3 values(9,11); + insert into t3 values(0,11); +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- true + is_prepared_on_node +--------------------- + t +(1 row) + +commit prepared 'pt_1'; +select * from t3 order by val; + val | val2 +-----+------ + 0 | 11 + 5 | 11 + 6 | 11 + 7 | 11 + 8 | 11 + 9 | 11 +(6 rows) + +-- **** +begin; + update t3 set val2 = 22; +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- true + is_prepared_on_node +--------------------- + t +(1 row) + +commit prepared 'pt_1'; +select * from t3 order by val; + val | val2 +-----+------ + 0 | 22 + 5 | 22 + 6 | 22 + 7 | 22 + 8 | 22 + 9 | 22 +(6 rows) + +-- **** +begin; + delete from t3 where val = 0; +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- true + is_prepared_on_node +--------------------- + t +(1 row) + +commit prepared 'pt_1'; +select * from t3 order by val; + val | val2 +-----+------ + 5 | 22 + 6 | 22 + 7 | 22 + 8 | 22 + 9 | 22 +(5 rows) + +-- **** +begin; + WITH q1 AS (SELECT * from t1 order by 1) SELECT * FROM q1; + val | val2 +-----+------ + 1 | 11 + 2 | 11 +(2 rows) + +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- false + is_prepared_on_node +--------------------- + f +(1 row) + +commit prepared 'pt_1'; +-- **** +begin; + +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- false + is_prepared_on_node +--------------------- + f +(1 row) + +commit prepared 'pt_1'; +-- **** +begin; + select * from t1, t2 where t1.val = t2.val; + val | val2 | val | val2 +-----+------+-----+------ +(0 rows) + +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- false + is_prepared_on_node +--------------------- + f +(1 row) + +commit prepared 'pt_1'; +-- ********************************** +-- repeat all tests with FQS disabled +-- ********************************** +delete from t3; +set enable_fast_query_shipping=false; +-- **** +begin; + select * from t1 order by val; + val | val2 +-----+------ + 1 | 11 + 2 | 11 +(2 rows) + + select * from t2 order by val; + val | val2 +-----+------ + 3 | 11 + 4 | 11 +(2 rows) + + select * from t3 order by val; + val | val2 +-----+------ +(0 rows) + + select * from p1 order by a; + a | b +-----+----- + 55 | 66 + 77 | 88 + 111 | 222 + 123 | 345 +(4 rows) + + select * from c1 order by a; + a | b | d | e +-----+-----+-----+----- + 111 | 222 | 333 | 444 + 123 | 345 | 567 | 789 +(2 rows) + +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- false + is_prepared_on_node +--------------------- + f +(1 row) + +commit prepared 'pt_1'; +-- **** +begin; + insert into t3 values(7,11); + insert into t3 values(8,11); + insert into t3 values(9,11); + insert into t3 values(0,11); +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- true + is_prepared_on_node +--------------------- + t +(1 row) + +commit prepared 'pt_1'; +select * from t3 order by val; + val | val2 +-----+------ + 0 | 11 + 7 | 11 + 8 | 11 + 9 | 11 +(4 rows) + +-- **** +begin; + update t3 set val2 = 22; +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- true + is_prepared_on_node +--------------------- + t +(1 row) + +commit prepared 'pt_1'; +select * from t3 order by val; + val | val2 +-----+------ + 0 | 22 + 7 | 22 + 8 | 22 + 9 | 22 +(4 rows) + +-- **** +begin; + delete from t3 where val = 7; +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- true + is_prepared_on_node +--------------------- + t +(1 row) + +commit prepared 'pt_1'; +select * from t3 order by val; + val | val2 +-----+------ + 0 | 22 + 8 | 22 + 9 | 22 +(3 rows) + +-- **** +begin; + WITH q1 AS (SELECT * from t1 order by 1) SELECT * FROM q1; + val | val2 +-----+------ + 1 | 11 + 2 | 11 +(2 rows) + +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- false + is_prepared_on_node +--------------------- + f +(1 row) + +commit prepared 'pt_1'; +-- **** +begin; + +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- false + is_prepared_on_node +--------------------- + f +(1 row) + +commit prepared 'pt_1'; +-- **** +begin; + select * from t1, t2 where t1.val = t2.val; + val | val2 | val | val2 +-----+------+-----+------ +(0 rows) + +prepare transaction 'pt_1'; +select gid from pg_prepared_xacts where gid = 'pt_1'; + gid +------ + pt_1 +(1 row) + +select is_prepared_on_node('pt_1', 1); -- false + is_prepared_on_node +--------------------- + f +(1 row) + +commit prepared 'pt_1'; +-- **** +set enable_fast_query_shipping=true; +-- drop objects created +drop table c1; +drop table p1; +drop table t1; +drop table t2; +drop table t3; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 835d483f7f..8057d9f90a 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -109,3 +109,6 @@ test: xc_groupby xc_distkey xc_having xc_temp xc_remote xc_FQS xc_FQS_join xc_co #Cluster setting related test is independant test: xc_node test: xc_misc + +#Additional tests for prepared xacts +test: xc_prepared_xacts diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 8b4d7f9290..e3b4736902 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -140,3 +140,4 @@ test: xc_copy test: xc_for_update test: xc_alter_table test: xc_sequence +test: xc_prepared_xacts diff --git a/src/test/regress/sql/xc_for_update.sql b/src/test/regress/sql/xc_for_update.sql index e64eb5d1ce..d512dd7592 100644 --- a/src/test/regress/sql/xc_for_update.sql +++ b/src/test/regress/sql/xc_for_update.sql @@ -94,7 +94,7 @@ explain (num_nodes off, nodes off, verbose on) select * from c1 for update; -- drop objects created drop table c1; -drop table c1; +drop table p1; drop view v1; drop table t1; drop table t2; diff --git a/src/test/regress/sql/xc_prepared_xacts.sql b/src/test/regress/sql/xc_prepared_xacts.sql new file mode 100644 index 0000000000..9fc1f8da1b --- /dev/null +++ b/src/test/regress/sql/xc_prepared_xacts.sql @@ -0,0 +1,262 @@ + +-- A function to return data node name given a node number +create or replace function get_xc_node_name(node_num int) returns varchar language plpgsql as $$ +declare + r pgxc_node%rowtype; + node int; + nodenames_query varchar; +begin + nodenames_query := 'SELECT * FROM pgxc_node WHERE node_type = ''D'' ORDER BY xc_node_id'; + + node := 1; + for r in execute nodenames_query loop + if node = node_num THEN + RETURN r.node_name; + end if; + node := node + 1; + end loop; + RETURN 'NODE_?'; +end; +$$; + + +-- A function to check whether a certain transaction was prepared on a specific data node given its number +create or replace function is_prepared_on_node(txn_id varchar, nodenum int) returns bool language plpgsql as $$ +declare + nodename varchar; + qry varchar; + r pg_prepared_xacts%rowtype; +begin + nodename := (SELECT get_xc_node_name(nodenum)); + qry := 'execute direct on ' || nodename || ' ' || chr(39) || 'select * from pg_prepared_xacts' || chr(39); + + for r in execute qry loop + if r.gid = txn_id THEN + RETURN true; + end if; + end loop; + return false; +end; +$$; + +set enable_fast_query_shipping=true; + + +-- Test to make sure prepared transactions are working as expected +-- If a transcation is preared and contains only a select, it should NOT be preapred on data nodes + +-- create some tables +create table t1(val int, val2 int) DISTRIBUTE BY REPLICATION; +create table t2(val int, val2 int) DISTRIBUTE BY REPLICATION; +create table t3(val int, val2 int) DISTRIBUTE BY REPLICATION; + +create table p1(a int, b int) DISTRIBUTE BY REPLICATION; +create table c1(d int, e int) inherits (p1) DISTRIBUTE BY REPLICATION; + +-- insert some rows in them +insert into t1 values(1,11),(2,11); +insert into t2 values(3,11),(4,11); +insert into t3 values(5,11),(6,11); + +insert into p1 values(55,66),(77,88); +insert into c1 values(111,222,333,444),(123,345,567,789); + +-- **** + +begin; + select * from t1 order by val; + select * from t2 order by val; + select * from t3 order by val; + select * from p1 order by a; + select * from c1 order by a; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- false + +commit prepared 'pt_1'; + +-- **** + +begin; + insert into t3 values(7,11); + insert into t3 values(8,11); + insert into t3 values(9,11); + insert into t3 values(0,11); +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- true + +commit prepared 'pt_1'; + +select * from t3 order by val; + +-- **** + +begin; + update t3 set val2 = 22; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- true + +commit prepared 'pt_1'; + +select * from t3 order by val; + +-- **** + +begin; + delete from t3 where val = 0; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- true + +commit prepared 'pt_1'; + +select * from t3 order by val; + +-- **** + +begin; + WITH q1 AS (SELECT * from t1 order by 1) SELECT * FROM q1; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- false + +commit prepared 'pt_1'; + +-- **** + +begin; + +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- false + +commit prepared 'pt_1'; + +-- **** + +begin; + select * from t1, t2 where t1.val = t2.val; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- false + +commit prepared 'pt_1'; + +-- ********************************** +-- repeat all tests with FQS disabled +-- ********************************** + +delete from t3; + +set enable_fast_query_shipping=false; + +-- **** + +begin; + select * from t1 order by val; + select * from t2 order by val; + select * from t3 order by val; + select * from p1 order by a; + select * from c1 order by a; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- false + +commit prepared 'pt_1'; + +-- **** + +begin; + insert into t3 values(7,11); + insert into t3 values(8,11); + insert into t3 values(9,11); + insert into t3 values(0,11); +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- true + +commit prepared 'pt_1'; + +select * from t3 order by val; + +-- **** + +begin; + update t3 set val2 = 22; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- true + +commit prepared 'pt_1'; + +select * from t3 order by val; + +-- **** + +begin; + delete from t3 where val = 7; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- true + +commit prepared 'pt_1'; + +select * from t3 order by val; + +-- **** + +begin; + WITH q1 AS (SELECT * from t1 order by 1) SELECT * FROM q1; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- false + +commit prepared 'pt_1'; + +-- **** + +begin; + +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- false + +commit prepared 'pt_1'; + +-- **** + +begin; + select * from t1, t2 where t1.val = t2.val; +prepare transaction 'pt_1'; + +select gid from pg_prepared_xacts where gid = 'pt_1'; +select is_prepared_on_node('pt_1', 1); -- false + +commit prepared 'pt_1'; + +-- **** + +set enable_fast_query_shipping=true; + +-- drop objects created +drop table c1; +drop table p1; +drop table t1; +drop table t2; +drop table t3; + |