summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael P2011-06-16 00:05:43 +0000
committerMichael P2011-06-16 00:05:43 +0000
commit88a19b42f3b599927b74e0aef2e19b9161b3a7eb (patch)
tree9522071fb3affa0309bebde19de9db1bbc80ccb5 /src
parentbcfa7b115f2a76e0146016dee36995df6eab89d0 (diff)
Support for START TRANSACTION
Until now XC was just sending down to Datanodes a plain BEGIN query each time a transaction was begun on backend nodes. This commit extends support of transaction start with isolation level and read operation customizations. Example of queries: START TRANSACTION ISOLATION LEVEL READ COMMITTED; START TRANSACTION READ WRITE; Regression test transaction is now fixed.
Diffstat (limited to 'src')
-rw-r--r--src/backend/pgxc/pool/execRemote.c59
-rw-r--r--src/backend/tcop/utility.c32
-rw-r--r--src/backend/utils/misc/guc.c40
-rw-r--r--src/include/pgxc/execRemote.h1
-rw-r--r--src/include/utils/guc.h4
5 files changed, 130 insertions, 6 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index ee02109a82..03482a0721 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -53,6 +53,7 @@ static bool is_ddl = false;
static bool implicit_force_autocommit = false;
static PGXCNodeHandle **write_node_list = NULL;
static int write_node_count = 0;
+static char *begin_string = NULL;
static int pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections,
GlobalTransactionId gxid);
@@ -1536,8 +1537,16 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections,
if (GlobalTimestampIsValid(timestamp) && pgxc_node_send_timestamp(connections[i], timestamp))
return EOF;
- if (pgxc_node_send_query(connections[i], "BEGIN"))
- return EOF;
+ if (begin_string)
+ {
+ if (pgxc_node_send_query(connections[i], begin_string))
+ return EOF;
+ }
+ else
+ {
+ if (pgxc_node_send_query(connections[i], "BEGIN"))
+ return EOF;
+ }
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
@@ -1573,6 +1582,22 @@ PGXCNodeBegin(void)
clear_write_node_list();
}
+void
+PGXCNodeSetBeginQuery(char *query_string)
+{
+ int len;
+
+ if (!query_string)
+ return;
+
+ len = strlen(query_string);
+ /*
+ * This query string is sent to backend nodes,
+ * it contains serializable and read options
+ */
+ begin_string = (char *)malloc(len + 1);
+ begin_string = memcpy(begin_string, query_string, len + 1);
+}
/*
* Prepare transaction on Datanodes and Coordinators involved in current transaction.
@@ -1624,6 +1649,11 @@ finish:
if (!PersistentConnections)
release_handles();
autocommit = true;
+ if (begin_string)
+ {
+ free(begin_string);
+ begin_string = NULL;
+ }
is_ddl = false;
clear_write_node_list();
@@ -1898,6 +1928,11 @@ finish:
if (!PersistentConnections && res == 0)
release_handles();
autocommit = true;
+ if (begin_string)
+ {
+ free(begin_string);
+ begin_string = NULL;
+ }
is_ddl = false;
clear_write_node_list();
@@ -2034,6 +2069,11 @@ finish:
if (!PersistentConnections)
release_handles();
autocommit = true;
+ if (begin_string)
+ {
+ free(begin_string);
+ begin_string = NULL;
+ }
is_ddl = false;
clear_write_node_list();
@@ -2158,6 +2198,11 @@ finish:
if (!PersistentConnections)
release_handles();
autocommit = true;
+ if (begin_string)
+ {
+ free(begin_string);
+ begin_string = NULL;
+ }
is_ddl = false;
clear_write_node_list();
@@ -2250,6 +2295,11 @@ finish:
if (!PersistentConnections && bReleaseHandles)
release_handles();
autocommit = true;
+ if (begin_string)
+ {
+ free(begin_string);
+ begin_string = NULL;
+ }
is_ddl = false;
clear_write_node_list();
@@ -2319,6 +2369,11 @@ finish:
if (!PersistentConnections)
release_handles();
autocommit = true;
+ if (begin_string)
+ {
+ free(begin_string);
+ begin_string = NULL;
+ }
is_ddl = false;
clear_write_node_list();
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 9e2e9d44bf..e9e7afd271 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -370,13 +370,10 @@ standard_ProcessUtility(Node *parsetree,
{
ListCell *lc;
#ifdef PGXC
- /*
- * If a COMMIT PREPARED message is received from another Coordinator,
- * Don't send it down to Datanodes.
- */
if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
PGXCNodeBegin();
#endif
+
BeginTransactionBlock();
foreach(lc, stmt->options)
{
@@ -391,6 +388,33 @@ standard_ProcessUtility(Node *parsetree,
list_make1(item->arg),
true);
}
+
+#ifdef PGXC
+ /*
+ * Now that all the local variables have been set,
+ * it is time to rebuild the query.
+ */
+ if (IS_PGXC_COORDINATOR && !IsConnFromCoord())
+ {
+ char *begin_string = NULL;
+
+ /* Result is palloc'd */
+ foreach(lc, stmt->options)
+ {
+ DefElem *item = (DefElem *) lfirst(lc);
+
+ if (strcmp(item->defname, "transaction_isolation") == 0)
+ begin_string = RewriteBeginQuery(begin_string,
+ "transaction_isolation",
+ list_make1(item->arg));
+ else if (strcmp(item->defname, "transaction_read_only") == 0)
+ begin_string = RewriteBeginQuery(begin_string,
+ "transaction_read_only",
+ list_make1(item->arg));
+ }
+ PGXCNodeSetBeginQuery(begin_string);
+ }
+#endif
}
break;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 869a172156..4d1c2a6f70 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -8320,4 +8320,44 @@ assign_application_name(const char *newval, bool doit, GucSource source)
return newval;
}
+#ifdef PGXC
+/*
+ * RewriteBeginQuery
+ *
+ * Rewrite transaction start query depending on the isolation level
+ * and read operation options.
+ */
+char *
+RewriteBeginQuery(char *query_string, const char *name, List *args)
+{
+ char *value = GetConfigOptionByName(name, NULL);
+
+ if (!query_string)
+ {
+ query_string = (char *)palloc(18);
+ sprintf(query_string, "START TRANSACTION");
+ }
+
+ if (strcmp(name, "transaction_isolation") == 0)
+ {
+ query_string = (char *)repalloc(query_string, strlen(query_string) + strlen(value) + 18);
+ sprintf(query_string, "%s ISOLATION LEVEL %s", query_string, value);
+ }
+ else if (strcmp(name, "transaction_read_only") == 0)
+ {
+ char buffer[512];
+ if (strcmp(value, "on") == 0)
+ sprintf(buffer, "READ ONLY");
+ else
+ sprintf(buffer, "READ WRITE");
+
+ query_string = (char *)repalloc(query_string, strlen(query_string) + strlen(buffer) + 2);
+ sprintf(query_string, "%s %s", query_string, buffer);
+ }
+
+ pfree(value);
+ return query_string;
+}
+#endif
+
#include "guc-file.c"
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index 405325b650..fb9232fca1 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -123,6 +123,7 @@ typedef struct RemoteQueryState
/* Multinode Executor */
extern void PGXCNodeBegin(void);
+extern void PGXCNodeSetBeginQuery(char *query_string);
extern void PGXCNodeCommit(bool bReleaseHandles);
extern int PGXCNodeRollback(void);
extern bool PGXCNodePrepare(char *gid);
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 9eb37b8860..6192d374da 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -277,6 +277,10 @@ extern void SetPGVariable(const char *name, List *args, bool is_local);
extern void GetPGVariable(const char *name, DestReceiver *dest);
extern TupleDesc GetPGVariableResultDesc(const char *name);
+#ifdef PGXC
+extern char *RewriteBeginQuery(char *query_string, const char *name, List *args);
+#endif
+
extern void ExecSetVariableStmt(VariableSetStmt *stmt);
extern char *ExtractSetVariableArgs(VariableSetStmt *stmt);