diff options
| author | Michael P | 2011-06-16 00:05:43 +0000 |
|---|---|---|
| committer | Michael P | 2011-06-16 00:05:43 +0000 |
| commit | 88a19b42f3b599927b74e0aef2e19b9161b3a7eb (patch) | |
| tree | 9522071fb3affa0309bebde19de9db1bbc80ccb5 /src | |
| parent | bcfa7b115f2a76e0146016dee36995df6eab89d0 (diff) | |
Support for START TRANSACTION
Until now XC was just sending down to Datanodes a plain BEGIN
query each time a transaction was begun on backend nodes.
This commit extends support of transaction start with
isolation level and read operation customizations.
Example of queries:
START TRANSACTION ISOLATION LEVEL READ COMMITTED;
START TRANSACTION READ WRITE;
Regression test transaction is now fixed.
Diffstat (limited to 'src')
| -rw-r--r-- | src/backend/pgxc/pool/execRemote.c | 59 | ||||
| -rw-r--r-- | src/backend/tcop/utility.c | 32 | ||||
| -rw-r--r-- | src/backend/utils/misc/guc.c | 40 | ||||
| -rw-r--r-- | src/include/pgxc/execRemote.h | 1 | ||||
| -rw-r--r-- | src/include/utils/guc.h | 4 |
5 files changed, 130 insertions, 6 deletions
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c index ee02109a82..03482a0721 100644 --- a/src/backend/pgxc/pool/execRemote.c +++ b/src/backend/pgxc/pool/execRemote.c @@ -53,6 +53,7 @@ static bool is_ddl = false; static bool implicit_force_autocommit = false; static PGXCNodeHandle **write_node_list = NULL; static int write_node_count = 0; +static char *begin_string = NULL; static int pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections, GlobalTransactionId gxid); @@ -1536,8 +1537,16 @@ pgxc_node_begin(int conn_count, PGXCNodeHandle ** connections, if (GlobalTimestampIsValid(timestamp) && pgxc_node_send_timestamp(connections[i], timestamp)) return EOF; - if (pgxc_node_send_query(connections[i], "BEGIN")) - return EOF; + if (begin_string) + { + if (pgxc_node_send_query(connections[i], begin_string)) + return EOF; + } + else + { + if (pgxc_node_send_query(connections[i], "BEGIN")) + return EOF; + } } combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE); @@ -1573,6 +1582,22 @@ PGXCNodeBegin(void) clear_write_node_list(); } +void +PGXCNodeSetBeginQuery(char *query_string) +{ + int len; + + if (!query_string) + return; + + len = strlen(query_string); + /* + * This query string is sent to backend nodes, + * it contains serializable and read options + */ + begin_string = (char *)malloc(len + 1); + begin_string = memcpy(begin_string, query_string, len + 1); +} /* * Prepare transaction on Datanodes and Coordinators involved in current transaction. @@ -1624,6 +1649,11 @@ finish: if (!PersistentConnections) release_handles(); autocommit = true; + if (begin_string) + { + free(begin_string); + begin_string = NULL; + } is_ddl = false; clear_write_node_list(); @@ -1898,6 +1928,11 @@ finish: if (!PersistentConnections && res == 0) release_handles(); autocommit = true; + if (begin_string) + { + free(begin_string); + begin_string = NULL; + } is_ddl = false; clear_write_node_list(); @@ -2034,6 +2069,11 @@ finish: if (!PersistentConnections) release_handles(); autocommit = true; + if (begin_string) + { + free(begin_string); + begin_string = NULL; + } is_ddl = false; clear_write_node_list(); @@ -2158,6 +2198,11 @@ finish: if (!PersistentConnections) release_handles(); autocommit = true; + if (begin_string) + { + free(begin_string); + begin_string = NULL; + } is_ddl = false; clear_write_node_list(); @@ -2250,6 +2295,11 @@ finish: if (!PersistentConnections && bReleaseHandles) release_handles(); autocommit = true; + if (begin_string) + { + free(begin_string); + begin_string = NULL; + } is_ddl = false; clear_write_node_list(); @@ -2319,6 +2369,11 @@ finish: if (!PersistentConnections) release_handles(); autocommit = true; + if (begin_string) + { + free(begin_string); + begin_string = NULL; + } is_ddl = false; clear_write_node_list(); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 9e2e9d44bf..e9e7afd271 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -370,13 +370,10 @@ standard_ProcessUtility(Node *parsetree, { ListCell *lc; #ifdef PGXC - /* - * If a COMMIT PREPARED message is received from another Coordinator, - * Don't send it down to Datanodes. - */ if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) PGXCNodeBegin(); #endif + BeginTransactionBlock(); foreach(lc, stmt->options) { @@ -391,6 +388,33 @@ standard_ProcessUtility(Node *parsetree, list_make1(item->arg), true); } + +#ifdef PGXC + /* + * Now that all the local variables have been set, + * it is time to rebuild the query. + */ + if (IS_PGXC_COORDINATOR && !IsConnFromCoord()) + { + char *begin_string = NULL; + + /* Result is palloc'd */ + foreach(lc, stmt->options) + { + DefElem *item = (DefElem *) lfirst(lc); + + if (strcmp(item->defname, "transaction_isolation") == 0) + begin_string = RewriteBeginQuery(begin_string, + "transaction_isolation", + list_make1(item->arg)); + else if (strcmp(item->defname, "transaction_read_only") == 0) + begin_string = RewriteBeginQuery(begin_string, + "transaction_read_only", + list_make1(item->arg)); + } + PGXCNodeSetBeginQuery(begin_string); + } +#endif } break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 869a172156..4d1c2a6f70 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -8320,4 +8320,44 @@ assign_application_name(const char *newval, bool doit, GucSource source) return newval; } +#ifdef PGXC +/* + * RewriteBeginQuery + * + * Rewrite transaction start query depending on the isolation level + * and read operation options. + */ +char * +RewriteBeginQuery(char *query_string, const char *name, List *args) +{ + char *value = GetConfigOptionByName(name, NULL); + + if (!query_string) + { + query_string = (char *)palloc(18); + sprintf(query_string, "START TRANSACTION"); + } + + if (strcmp(name, "transaction_isolation") == 0) + { + query_string = (char *)repalloc(query_string, strlen(query_string) + strlen(value) + 18); + sprintf(query_string, "%s ISOLATION LEVEL %s", query_string, value); + } + else if (strcmp(name, "transaction_read_only") == 0) + { + char buffer[512]; + if (strcmp(value, "on") == 0) + sprintf(buffer, "READ ONLY"); + else + sprintf(buffer, "READ WRITE"); + + query_string = (char *)repalloc(query_string, strlen(query_string) + strlen(buffer) + 2); + sprintf(query_string, "%s %s", query_string, buffer); + } + + pfree(value); + return query_string; +} +#endif + #include "guc-file.c" diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h index 405325b650..fb9232fca1 100644 --- a/src/include/pgxc/execRemote.h +++ b/src/include/pgxc/execRemote.h @@ -123,6 +123,7 @@ typedef struct RemoteQueryState /* Multinode Executor */ extern void PGXCNodeBegin(void); +extern void PGXCNodeSetBeginQuery(char *query_string); extern void PGXCNodeCommit(bool bReleaseHandles); extern int PGXCNodeRollback(void); extern bool PGXCNodePrepare(char *gid); diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 9eb37b8860..6192d374da 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -277,6 +277,10 @@ extern void SetPGVariable(const char *name, List *args, bool is_local); extern void GetPGVariable(const char *name, DestReceiver *dest); extern TupleDesc GetPGVariableResultDesc(const char *name); +#ifdef PGXC +extern char *RewriteBeginQuery(char *query_string, const char *name, List *args); +#endif + extern void ExecSetVariableStmt(VariableSetStmt *stmt); extern char *ExtractSetVariableArgs(VariableSetStmt *stmt); |
