summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/optimizer/plan/createplan.c2
-rw-r--r--src/backend/pgxc/pool/execRemote.c116
-rw-r--r--src/include/pgxc/planner.h60
-rw-r--r--src/test/regress/expected/xc_for_update.out3
-rw-r--r--src/test/regress/expected/xc_prepared_xacts.out457
-rw-r--r--src/test/regress/parallel_schedule3
-rw-r--r--src/test/regress/serial_schedule1
-rw-r--r--src/test/regress/sql/xc_for_update.sql2
-rw-r--r--src/test/regress/sql/xc_prepared_xacts.sql262
9 files changed, 844 insertions, 62 deletions
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index f5939c2597..2014a73b9e 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -2773,6 +2773,8 @@ create_remotequery_plan(PlannerInfo *root, Path *best_path,
/* Track if the remote query involves a temporary object */
scan_plan->is_temp = IsTempTable(rte->relid);
+ scan_plan->read_only = query->commandType == CMD_SELECT;
+
scan_plan->sql_statement = sql.data;
/*
* If the table distributed by value, check if we can reduce the datanodes
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 67fa618f05..bdf174440c 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -145,7 +145,7 @@ static bool pgxc_start_command_on_connection(PGXCNodeHandle *connection,
static TupleTableSlot * RemoteQueryNext(RemoteQueryState *node);
static char *generate_begin_command(void);
-static bool pgxc_node_remote_prepare(char *prepareGID);
+static bool pgxc_node_remote_prepare(char *prepareGID, bool implicit);
static void pgxc_node_remote_commit(void);
static void pgxc_node_remote_abort(void);
static char *pgxc_node_get_nodelist(bool localNode);
@@ -1556,20 +1556,34 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle **connections,
* prepareGID is created and passed from xact.c
*/
static bool
-pgxc_node_remote_prepare(char *prepareGID)
+pgxc_node_remote_prepare(char *prepareGID, bool implicit)
{
- int result = 0;
- int write_conn_count = remoteXactState.numWriteRemoteNodes;
+ int result = 0;
+ int write_conn_count = remoteXactState.numWriteRemoteNodes;
+ int read_conn_count = remoteXactState.numReadRemoteNodes;
char prepare_cmd[256];
- int i;
- PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles;
- RemoteQueryState *combiner = NULL;
+ char commit_cmd[256];
+ char *cmd_to_send;
+ int i;
+ PGXCNodeHandle **connections = remoteXactState.remoteNodeHandles;
+ RemoteQueryState *combiner = NULL;
+ RemoteXactNodeStatus rxns;
+ RemoteXactNodeStatus rxns_ok;
+ RemoteXactStatus rxs;
- /*
- * If there is NO write activity or the caller does not want us to run a
- * 2PC protocol, we don't need to do anything special
+ /* In case of implicit prepare nothing to do on read only nodes */
+ if (implicit)
+ read_conn_count = 0;
+
+ /*
+ * Was begin sent to any node?
+ * if no, no point sending either prepare transaction
+ * or commit transaction to any of them
+ * Probably user did a begin and immediately followed it by
+ * a prepare transaction
+ * Return also if the caller does not want to run 2PC protocol
*/
- if ((write_conn_count == 0) || (prepareGID == NULL))
+ if (((read_conn_count + write_conn_count) == 0) || prepareGID == NULL)
return false;
/* Save the prepareGID in the global state information */
@@ -1578,8 +1592,33 @@ pgxc_node_remote_prepare(char *prepareGID)
/* Generate the PREPARE TRANSACTION command */
sprintf(prepare_cmd, "PREPARE TRANSACTION '%s'", remoteXactState.prepareGID);
- for (i = 0; i < write_conn_count; i++)
+ /*
+ * Generate the COMMIT TRANSACTION command for nodes
+ * where this transaction was not prepared because
+ * nothing was written to them by this transaction
+ * but transaction was started on those nodes
+ * so it has to be ended gracefully
+ */
+ sprintf(commit_cmd, "COMMIT TRANSACTION");
+
+ cmd_to_send = (char *)&prepare_cmd;
+ rxns = RXACT_NODE_PREPARE_FAILED;
+ rxs = RXACT_PREPARE_FAILED;
+ rxns_ok = RXACT_NODE_PREPARE_SENT;
+ for (i = 0; i < write_conn_count + read_conn_count; i++)
{
+ /*
+ * As long as we are dealing with write nodes send prepare transaction,
+ * otherwise commit transaction
+ */
+ if (i >= write_conn_count)
+ {
+ cmd_to_send = (char *)&commit_cmd;
+ rxns = RXACT_NODE_COMMIT_FAILED;
+ rxs = RXACT_COMMIT_FAILED;
+ rxns_ok = RXACT_NODE_COMMIT_SENT;
+ }
+
/*
* PGXCTODO - We should actually make sure that the connection state is
* IDLE when we reach here. The executor should have guaranteed that
@@ -1596,10 +1635,10 @@ pgxc_node_remote_prepare(char *prepareGID)
* Now we are ready to PREPARE the transaction. Any error at this point
* can be safely ereport-ed and the transaction will be aborted.
*/
- if (pgxc_node_send_query(connections[i], prepare_cmd))
+ if (pgxc_node_send_query(connections[i], cmd_to_send))
{
- remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED;
- remoteXactState.status = RXACT_PREPARE_FAILED;
+ remoteXactState.remoteNodeStatus[i] = rxns;
+ remoteXactState.status = rxs;
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("failed to send PREPARE TRANSACTION command to "
@@ -1607,7 +1646,7 @@ pgxc_node_remote_prepare(char *prepareGID)
}
else
{
- remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_SENT;
+ remoteXactState.remoteNodeStatus[i] = rxns_ok;
/* Let the HandleCommandComplete know response checking is enable */
connections[i]->ck_resp_rollback = RESP_ROLLBACK_CHECK;
}
@@ -1626,11 +1665,11 @@ pgxc_node_remote_prepare(char *prepareGID)
* good for parallel proessing, but I think we should have a leak-proof
* mechanism to track connection status
*/
- if (write_conn_count)
+ if (write_conn_count + read_conn_count)
{
- combiner = CreateResponseCombiner(write_conn_count, COMBINE_TYPE_NONE);
+ combiner = CreateResponseCombiner(write_conn_count + read_conn_count, COMBINE_TYPE_NONE);
/* Receive responses */
- result = pgxc_node_receive_responses(write_conn_count, connections, NULL, combiner);
+ result = pgxc_node_receive_responses(write_conn_count + read_conn_count, connections, NULL, combiner);
if (result || !validate_combiner(combiner))
result = EOF;
else
@@ -1639,28 +1678,46 @@ pgxc_node_remote_prepare(char *prepareGID)
combiner = NULL;
}
- for (i = 0; i < write_conn_count; i++)
+ rxns = RXACT_NODE_PREPARE_FAILED;
+ rxs = RXACT_PREPARE_FAILED;
+ rxns_ok = RXACT_NODE_PREPARED;
+ for (i = 0; i < write_conn_count + read_conn_count; i++)
{
- if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT)
+ /*
+ * As long as we are dealing with write nodes error status
+ * will be for prepare transaction, otherwise commit transaction
+ */
+ if (i >= write_conn_count)
+ {
+ rxns = RXACT_NODE_COMMIT_FAILED;
+ rxs = RXACT_COMMIT_FAILED;
+ rxns_ok = RXACT_NODE_COMMITTED;
+ }
+
+ if (remoteXactState.remoteNodeStatus[i] == RXACT_NODE_PREPARE_SENT ||
+ remoteXactState.remoteNodeStatus[i] == RXACT_NODE_COMMIT_SENT)
{
if (connections[i]->error)
{
- remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED;
- remoteXactState.status = RXACT_PREPARE_FAILED;
+ remoteXactState.remoteNodeStatus[i] = rxns;
+ remoteXactState.status = rxs;
}
else
{
/* Did we receive ROLLBACK in response to PREPARE TRANSCATION? */
if (connections[i]->ck_resp_rollback == RESP_ROLLBACK_RECEIVED)
{
- /* If yes, it means PREPARE TRANSACTION failed */
- remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARE_FAILED;
- remoteXactState.status = RXACT_PREPARE_FAILED;
+ /*
+ * If yes, it means PREPARE TRANSACTION or COMMIT TRANSACTION failed
+ * report error accordingly
+ */
+ remoteXactState.remoteNodeStatus[i] = rxns;
+ remoteXactState.status = rxs;
result = 0;
}
else
{
- remoteXactState.remoteNodeStatus[i] = RXACT_NODE_PREPARED;
+ remoteXactState.remoteNodeStatus[i] = rxns_ok;
}
}
}
@@ -1680,7 +1737,8 @@ pgxc_node_remote_prepare(char *prepareGID)
elog(ERROR, "failed to PREPARE transaction on one or more nodes");
}
- if (remoteXactState.status == RXACT_PREPARE_FAILED)
+ if (remoteXactState.status == RXACT_PREPARE_FAILED ||
+ remoteXactState.status == RXACT_COMMIT_FAILED )
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to PREPARE the transaction on one or more nodes")));
@@ -4342,7 +4400,7 @@ PrePrepare_Remote(char *prepareGID, bool localNode, bool implicit)
* local node. Any errors will be reported via ereport and the transaction
* will be aborted accordingly.
*/
- pgxc_node_remote_prepare(prepareGID);
+ pgxc_node_remote_prepare(prepareGID, implicit);
if (preparedNodes)
pfree(preparedNodes);
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index c9310b4b0e..06f53e7ce8 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -80,36 +80,36 @@ typedef enum
*/
typedef struct
{
- Scan scan;
- ExecDirectType exec_direct_type; /* track if remote query is execute direct and what type it is */
- char *sql_statement;
- ExecNodes *exec_nodes; /* List of Datanodes where to launch query */
- CombineType combine_type;
- SimpleSort *sort;
- bool read_only; /* do not use 2PC when committing read only steps */
- bool force_autocommit; /* some commands like VACUUM require autocommit mode */
- char *statement; /* if specified use it as a PreparedStatement name on data nodes */
- char *cursor; /* if specified use it as a Portal name on data nodes */
- int num_params; /* number of parameters specified for Prepared statement */
- Oid *param_types; /* parameter types, this pointer is shared
- * across all the RemoteQuery nodes in the
- * plan. So, don't change this once set.
- */
- RemoteQueryExecType exec_type;
- bool is_temp; /* determine if this remote node is based
- * on a temporary objects (no 2PC) */
-
- int reduce_level; /* in case of reduced JOIN, it's level */
- List *base_tlist; /* in case of isReduced, the base tlist */
- char *outer_alias;
- char *inner_alias;
- int outer_reduce_level;
- int inner_reduce_level;
- Relids outer_relids;
- Relids inner_relids;
- char *inner_statement;
- char *outer_statement;
- char *join_condition;
+ Scan scan;
+ ExecDirectType exec_direct_type; /* track if remote query is execute direct and what type it is */
+ char *sql_statement;
+ ExecNodes *exec_nodes; /* List of Datanodes where to launch query */
+ CombineType combine_type;
+ SimpleSort *sort;
+ bool read_only; /* do not use 2PC when committing read only steps */
+ bool force_autocommit; /* some commands like VACUUM require autocommit mode */
+ char *statement; /* if specified use it as a PreparedStatement name on data nodes */
+ char *cursor; /* if specified use it as a Portal name on data nodes */
+ int num_params; /* number of parameters specified for Prepared statement */
+ Oid *param_types; /* parameter types, this pointer is shared
+ * across all the RemoteQuery nodes in the
+ * plan. So, don't change this once set.
+ */
+ RemoteQueryExecType exec_type;
+ bool is_temp; /* determine if this remote node is based
+ * on a temporary objects (no 2PC) */
+
+ int reduce_level; /* in case of reduced JOIN, it's level */
+ List *base_tlist; /* in case of isReduced, the base tlist */
+ char *outer_alias;
+ char *inner_alias;
+ int outer_reduce_level;
+ int inner_reduce_level;
+ Relids outer_relids;
+ Relids inner_relids;
+ char *inner_statement;
+ char *outer_statement;
+ char *join_condition;
} RemoteQuery;
/*
diff --git a/src/test/regress/expected/xc_for_update.out b/src/test/regress/expected/xc_for_update.out
index ba13280c32..d69ade5466 100644
--- a/src/test/regress/expected/xc_for_update.out
+++ b/src/test/regress/expected/xc_for_update.out
@@ -625,8 +625,7 @@ explain (num_nodes off, nodes off, verbose on) select * from c1 for update;
-- drop objects created
drop table c1;
-drop table c1;
-ERROR: table "c1" does not exist
+drop table p1;
drop view v1;
drop table t1;
drop table t2;
diff --git a/src/test/regress/expected/xc_prepared_xacts.out b/src/test/regress/expected/xc_prepared_xacts.out
new file mode 100644
index 0000000000..6ddb2e44f8
--- /dev/null
+++ b/src/test/regress/expected/xc_prepared_xacts.out
@@ -0,0 +1,457 @@
+-- A function to return data node name given a node number
+create or replace function get_xc_node_name(node_num int) returns varchar language plpgsql as $$
+declare
+ r pgxc_node%rowtype;
+ node int;
+ nodenames_query varchar;
+begin
+ nodenames_query := 'SELECT * FROM pgxc_node WHERE node_type = ''D'' ORDER BY xc_node_id';
+
+ node := 1;
+ for r in execute nodenames_query loop
+ if node = node_num THEN
+ RETURN r.node_name;
+ end if;
+ node := node + 1;
+ end loop;
+ RETURN 'NODE_?';
+end;
+$$;
+-- A function to check whether a certain transaction was prepared on a specific data node given its number
+create or replace function is_prepared_on_node(txn_id varchar, nodenum int) returns bool language plpgsql as $$
+declare
+ nodename varchar;
+ qry varchar;
+ r pg_prepared_xacts%rowtype;
+begin
+ nodename := (SELECT get_xc_node_name(nodenum));
+ qry := 'execute direct on ' || nodename || ' ' || chr(39) || 'select * from pg_prepared_xacts' || chr(39);
+
+ for r in execute qry loop
+ if r.gid = txn_id THEN
+ RETURN true;
+ end if;
+ end loop;
+ return false;
+end;
+$$;
+set enable_fast_query_shipping=true;
+-- Test to make sure prepared transactions are working as expected
+-- If a transcation is preared and contains only a select, it should NOT be preapred on data nodes
+-- create some tables
+create table t1(val int, val2 int) DISTRIBUTE BY REPLICATION;
+create table t2(val int, val2 int) DISTRIBUTE BY REPLICATION;
+create table t3(val int, val2 int) DISTRIBUTE BY REPLICATION;
+create table p1(a int, b int) DISTRIBUTE BY REPLICATION;
+create table c1(d int, e int) inherits (p1) DISTRIBUTE BY REPLICATION;
+-- insert some rows in them
+insert into t1 values(1,11),(2,11);
+insert into t2 values(3,11),(4,11);
+insert into t3 values(5,11),(6,11);
+insert into p1 values(55,66),(77,88);
+insert into c1 values(111,222,333,444),(123,345,567,789);
+-- ****
+begin;
+ select * from t1 order by val;
+ val | val2
+-----+------
+ 1 | 11
+ 2 | 11
+(2 rows)
+
+ select * from t2 order by val;
+ val | val2
+-----+------
+ 3 | 11
+ 4 | 11
+(2 rows)
+
+ select * from t3 order by val;
+ val | val2
+-----+------
+ 5 | 11
+ 6 | 11
+(2 rows)
+
+ select * from p1 order by a;
+ a | b
+-----+-----
+ 55 | 66
+ 77 | 88
+ 111 | 222
+ 123 | 345
+(4 rows)
+
+ select * from c1 order by a;
+ a | b | d | e
+-----+-----+-----+-----
+ 111 | 222 | 333 | 444
+ 123 | 345 | 567 | 789
+(2 rows)
+
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- false
+ is_prepared_on_node
+---------------------
+ f
+(1 row)
+
+commit prepared 'pt_1';
+-- ****
+begin;
+ insert into t3 values(7,11);
+ insert into t3 values(8,11);
+ insert into t3 values(9,11);
+ insert into t3 values(0,11);
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- true
+ is_prepared_on_node
+---------------------
+ t
+(1 row)
+
+commit prepared 'pt_1';
+select * from t3 order by val;
+ val | val2
+-----+------
+ 0 | 11
+ 5 | 11
+ 6 | 11
+ 7 | 11
+ 8 | 11
+ 9 | 11
+(6 rows)
+
+-- ****
+begin;
+ update t3 set val2 = 22;
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- true
+ is_prepared_on_node
+---------------------
+ t
+(1 row)
+
+commit prepared 'pt_1';
+select * from t3 order by val;
+ val | val2
+-----+------
+ 0 | 22
+ 5 | 22
+ 6 | 22
+ 7 | 22
+ 8 | 22
+ 9 | 22
+(6 rows)
+
+-- ****
+begin;
+ delete from t3 where val = 0;
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- true
+ is_prepared_on_node
+---------------------
+ t
+(1 row)
+
+commit prepared 'pt_1';
+select * from t3 order by val;
+ val | val2
+-----+------
+ 5 | 22
+ 6 | 22
+ 7 | 22
+ 8 | 22
+ 9 | 22
+(5 rows)
+
+-- ****
+begin;
+ WITH q1 AS (SELECT * from t1 order by 1) SELECT * FROM q1;
+ val | val2
+-----+------
+ 1 | 11
+ 2 | 11
+(2 rows)
+
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- false
+ is_prepared_on_node
+---------------------
+ f
+(1 row)
+
+commit prepared 'pt_1';
+-- ****
+begin;
+
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- false
+ is_prepared_on_node
+---------------------
+ f
+(1 row)
+
+commit prepared 'pt_1';
+-- ****
+begin;
+ select * from t1, t2 where t1.val = t2.val;
+ val | val2 | val | val2
+-----+------+-----+------
+(0 rows)
+
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- false
+ is_prepared_on_node
+---------------------
+ f
+(1 row)
+
+commit prepared 'pt_1';
+-- **********************************
+-- repeat all tests with FQS disabled
+-- **********************************
+delete from t3;
+set enable_fast_query_shipping=false;
+-- ****
+begin;
+ select * from t1 order by val;
+ val | val2
+-----+------
+ 1 | 11
+ 2 | 11
+(2 rows)
+
+ select * from t2 order by val;
+ val | val2
+-----+------
+ 3 | 11
+ 4 | 11
+(2 rows)
+
+ select * from t3 order by val;
+ val | val2
+-----+------
+(0 rows)
+
+ select * from p1 order by a;
+ a | b
+-----+-----
+ 55 | 66
+ 77 | 88
+ 111 | 222
+ 123 | 345
+(4 rows)
+
+ select * from c1 order by a;
+ a | b | d | e
+-----+-----+-----+-----
+ 111 | 222 | 333 | 444
+ 123 | 345 | 567 | 789
+(2 rows)
+
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- false
+ is_prepared_on_node
+---------------------
+ f
+(1 row)
+
+commit prepared 'pt_1';
+-- ****
+begin;
+ insert into t3 values(7,11);
+ insert into t3 values(8,11);
+ insert into t3 values(9,11);
+ insert into t3 values(0,11);
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- true
+ is_prepared_on_node
+---------------------
+ t
+(1 row)
+
+commit prepared 'pt_1';
+select * from t3 order by val;
+ val | val2
+-----+------
+ 0 | 11
+ 7 | 11
+ 8 | 11
+ 9 | 11
+(4 rows)
+
+-- ****
+begin;
+ update t3 set val2 = 22;
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- true
+ is_prepared_on_node
+---------------------
+ t
+(1 row)
+
+commit prepared 'pt_1';
+select * from t3 order by val;
+ val | val2
+-----+------
+ 0 | 22
+ 7 | 22
+ 8 | 22
+ 9 | 22
+(4 rows)
+
+-- ****
+begin;
+ delete from t3 where val = 7;
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- true
+ is_prepared_on_node
+---------------------
+ t
+(1 row)
+
+commit prepared 'pt_1';
+select * from t3 order by val;
+ val | val2
+-----+------
+ 0 | 22
+ 8 | 22
+ 9 | 22
+(3 rows)
+
+-- ****
+begin;
+ WITH q1 AS (SELECT * from t1 order by 1) SELECT * FROM q1;
+ val | val2
+-----+------
+ 1 | 11
+ 2 | 11
+(2 rows)
+
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- false
+ is_prepared_on_node
+---------------------
+ f
+(1 row)
+
+commit prepared 'pt_1';
+-- ****
+begin;
+
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- false
+ is_prepared_on_node
+---------------------
+ f
+(1 row)
+
+commit prepared 'pt_1';
+-- ****
+begin;
+ select * from t1, t2 where t1.val = t2.val;
+ val | val2 | val | val2
+-----+------+-----+------
+(0 rows)
+
+prepare transaction 'pt_1';
+select gid from pg_prepared_xacts where gid = 'pt_1';
+ gid
+------
+ pt_1
+(1 row)
+
+select is_prepared_on_node('pt_1', 1); -- false
+ is_prepared_on_node
+---------------------
+ f
+(1 row)
+
+commit prepared 'pt_1';
+-- ****
+set enable_fast_query_shipping=true;
+-- drop objects created
+drop table c1;
+drop table p1;
+drop table t1;
+drop table t2;
+drop table t3;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 835d483f7f..8057d9f90a 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -109,3 +109,6 @@ test: xc_groupby xc_distkey xc_having xc_temp xc_remote xc_FQS xc_FQS_join xc_co
#Cluster setting related test is independant
test: xc_node
test: xc_misc
+
+#Additional tests for prepared xacts
+test: xc_prepared_xacts
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 8b4d7f9290..e3b4736902 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -140,3 +140,4 @@ test: xc_copy
test: xc_for_update
test: xc_alter_table
test: xc_sequence
+test: xc_prepared_xacts
diff --git a/src/test/regress/sql/xc_for_update.sql b/src/test/regress/sql/xc_for_update.sql
index e64eb5d1ce..d512dd7592 100644
--- a/src/test/regress/sql/xc_for_update.sql
+++ b/src/test/regress/sql/xc_for_update.sql
@@ -94,7 +94,7 @@ explain (num_nodes off, nodes off, verbose on) select * from c1 for update;
-- drop objects created
drop table c1;
-drop table c1;
+drop table p1;
drop view v1;
drop table t1;
drop table t2;
diff --git a/src/test/regress/sql/xc_prepared_xacts.sql b/src/test/regress/sql/xc_prepared_xacts.sql
new file mode 100644
index 0000000000..9fc1f8da1b
--- /dev/null
+++ b/src/test/regress/sql/xc_prepared_xacts.sql
@@ -0,0 +1,262 @@
+
+-- A function to return data node name given a node number
+create or replace function get_xc_node_name(node_num int) returns varchar language plpgsql as $$
+declare
+ r pgxc_node%rowtype;
+ node int;
+ nodenames_query varchar;
+begin
+ nodenames_query := 'SELECT * FROM pgxc_node WHERE node_type = ''D'' ORDER BY xc_node_id';
+
+ node := 1;
+ for r in execute nodenames_query loop
+ if node = node_num THEN
+ RETURN r.node_name;
+ end if;
+ node := node + 1;
+ end loop;
+ RETURN 'NODE_?';
+end;
+$$;
+
+
+-- A function to check whether a certain transaction was prepared on a specific data node given its number
+create or replace function is_prepared_on_node(txn_id varchar, nodenum int) returns bool language plpgsql as $$
+declare
+ nodename varchar;
+ qry varchar;
+ r pg_prepared_xacts%rowtype;
+begin
+ nodename := (SELECT get_xc_node_name(nodenum));
+ qry := 'execute direct on ' || nodename || ' ' || chr(39) || 'select * from pg_prepared_xacts' || chr(39);
+
+ for r in execute qry loop
+ if r.gid = txn_id THEN
+ RETURN true;
+ end if;
+ end loop;
+ return false;
+end;
+$$;
+
+set enable_fast_query_shipping=true;
+
+
+-- Test to make sure prepared transactions are working as expected
+-- If a transcation is preared and contains only a select, it should NOT be preapred on data nodes
+
+-- create some tables
+create table t1(val int, val2 int) DISTRIBUTE BY REPLICATION;
+create table t2(val int, val2 int) DISTRIBUTE BY REPLICATION;
+create table t3(val int, val2 int) DISTRIBUTE BY REPLICATION;
+
+create table p1(a int, b int) DISTRIBUTE BY REPLICATION;
+create table c1(d int, e int) inherits (p1) DISTRIBUTE BY REPLICATION;
+
+-- insert some rows in them
+insert into t1 values(1,11),(2,11);
+insert into t2 values(3,11),(4,11);
+insert into t3 values(5,11),(6,11);
+
+insert into p1 values(55,66),(77,88);
+insert into c1 values(111,222,333,444),(123,345,567,789);
+
+-- ****
+
+begin;
+ select * from t1 order by val;
+ select * from t2 order by val;
+ select * from t3 order by val;
+ select * from p1 order by a;
+ select * from c1 order by a;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- false
+
+commit prepared 'pt_1';
+
+-- ****
+
+begin;
+ insert into t3 values(7,11);
+ insert into t3 values(8,11);
+ insert into t3 values(9,11);
+ insert into t3 values(0,11);
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- true
+
+commit prepared 'pt_1';
+
+select * from t3 order by val;
+
+-- ****
+
+begin;
+ update t3 set val2 = 22;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- true
+
+commit prepared 'pt_1';
+
+select * from t3 order by val;
+
+-- ****
+
+begin;
+ delete from t3 where val = 0;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- true
+
+commit prepared 'pt_1';
+
+select * from t3 order by val;
+
+-- ****
+
+begin;
+ WITH q1 AS (SELECT * from t1 order by 1) SELECT * FROM q1;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- false
+
+commit prepared 'pt_1';
+
+-- ****
+
+begin;
+
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- false
+
+commit prepared 'pt_1';
+
+-- ****
+
+begin;
+ select * from t1, t2 where t1.val = t2.val;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- false
+
+commit prepared 'pt_1';
+
+-- **********************************
+-- repeat all tests with FQS disabled
+-- **********************************
+
+delete from t3;
+
+set enable_fast_query_shipping=false;
+
+-- ****
+
+begin;
+ select * from t1 order by val;
+ select * from t2 order by val;
+ select * from t3 order by val;
+ select * from p1 order by a;
+ select * from c1 order by a;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- false
+
+commit prepared 'pt_1';
+
+-- ****
+
+begin;
+ insert into t3 values(7,11);
+ insert into t3 values(8,11);
+ insert into t3 values(9,11);
+ insert into t3 values(0,11);
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- true
+
+commit prepared 'pt_1';
+
+select * from t3 order by val;
+
+-- ****
+
+begin;
+ update t3 set val2 = 22;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- true
+
+commit prepared 'pt_1';
+
+select * from t3 order by val;
+
+-- ****
+
+begin;
+ delete from t3 where val = 7;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- true
+
+commit prepared 'pt_1';
+
+select * from t3 order by val;
+
+-- ****
+
+begin;
+ WITH q1 AS (SELECT * from t1 order by 1) SELECT * FROM q1;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- false
+
+commit prepared 'pt_1';
+
+-- ****
+
+begin;
+
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- false
+
+commit prepared 'pt_1';
+
+-- ****
+
+begin;
+ select * from t1, t2 where t1.val = t2.val;
+prepare transaction 'pt_1';
+
+select gid from pg_prepared_xacts where gid = 'pt_1';
+select is_prepared_on_node('pt_1', 1); -- false
+
+commit prepared 'pt_1';
+
+-- ****
+
+set enable_fast_query_shipping=true;
+
+-- drop objects created
+drop table c1;
+drop table p1;
+drop table t1;
+drop table t2;
+drop table t3;
+