diff options
| author | Marko Kreen | 2007-10-11 12:03:51 +0000 |
|---|---|---|
| committer | Marko Kreen | 2007-10-11 12:03:51 +0000 |
| commit | 40c9d55902aaf8d36a78159c02887d07ea3c62df (patch) | |
| tree | 4d083635623335d0f4fce6f655691b7cc416914c /src/execute.c | |
| parent | 417ecca0bcde10665f07029c92b953db69d3039c (diff) | |
experimantal server parameter syncing
Diffstat (limited to 'src/execute.c')
| -rw-r--r-- | src/execute.c | 148 |
1 files changed, 89 insertions, 59 deletions
diff --git a/src/execute.c b/src/execute.c index 1b8a123..faeeec7 100644 --- a/src/execute.c +++ b/src/execute.c @@ -68,6 +68,23 @@ cmp_branch(const char *this, const char *that) return true; } +static void +flush_connection(ProxyFunction *func, ProxyConnection *conn) +{ + int res; + + /* flush it down */ + res = PQflush(conn->db); + + /* set actual state */ + if (res > 0) + conn->state = C_QUERY_WRITE; + else if (res == 0) + conn->state = C_QUERY_READ; + else + conn_error(func, conn, "PQflush"); +} + /* * Small sanity checking for new connections. * @@ -75,40 +92,82 @@ cmp_branch(const char *this, const char *that) * - Does there happen any encoding conversations? * - Difference in standard_conforming_strings. */ -static void -check_new_connection(ProxyConnection *conn) +static int +tune_connection(ProxyFunction *func, ProxyConnection *conn) { - const char *px_client, - *px_server, - *dst_client, - *dst_server; + ProxyConfig *cf = &func->cur_cluster->config; + const char *this_enc, *dst_enc; const char *q_server; const char *dst_ver; int srvquotes = 0; + StringInfo sql = NULL; dst_ver = PQparameterStatus(conn->db, "server_version"); conn->same_ver = cmp_branch(dst_ver, PG_VERSION); + /* + * sync standard_conforming_strings + */ q_server = PQparameterStatus(conn->db, "standard_conforming_strings"); if (q_server && strcasecmp(q_server, "off") != 0) srvquotes = 1; if (standard_conforming_strings != srvquotes) - elog(WARNING, "PL/Proxy: different setting of" - " standard_conforming_strings"); - - px_client = pg_get_client_encoding_name(); - px_server = GetDatabaseEncodingName(); - dst_client = PQparameterStatus(conn->db, "client_encoding"); - dst_server = PQparameterStatus(conn->db, "server_encoding"); - if (strcmp(px_client, px_server) - || strcmp(px_client, dst_client) - || (dst_server && strcmp(px_client, dst_server))) { - elog(WARNING, "PL/Proxy: encoding mismatch:" - " proxy client/server: %s/%s," - " partition client/server: %s/%s", - px_client, px_server, dst_client, dst_server); + if (!sql) + sql = makeStringInfo(); + appendStringInfo(sql, "set standard_conforming_strings = '%s'; ", + standard_conforming_strings ? "on" : "off"); + } + + /* + * sync client_encoding + */ + this_enc = pg_get_client_encoding_name(); + dst_enc = PQparameterStatus(conn->db, "client_encoding"); + if (dst_enc && strcmp(this_enc, dst_enc)) + { + if (!sql) + sql = makeStringInfo(); + appendStringInfo(sql, "set client_encoding = '%s'; ", this_enc); + } + + /* + * if second time in this function, there should have been + * active already. + */ + if (sql && conn->tuning) + { + appendStringInfo(sql, "-- those parameters does not seem to apply"); + conn_error(func, conn, sql->data); + } + + /* add statement_timeout to query */ + if (cf->statement_timeout >= 0 && !conn->tuning) + { + if (!sql) + sql = makeStringInfo(); + appendStringInfo(sql, "set statement_timeout = %d; ", + cf->statement_timeout); + } + + /* + * send tuning query + */ + if (sql) + { + conn->tuning = 1; + conn->state = C_QUERY_WRITE; + if (!PQsendQuery(conn->db, sql->data)) + conn_error(func, conn, "PQsendQuery"); + pfree(sql->data); + pfree(sql); + + flush_connection(func, conn); + return 1; } + + conn->tuning = 0; + return 0; } /* send the query to server connection */ @@ -119,16 +178,15 @@ send_query(ProxyFunction *func, ProxyConnection *conn, int res; struct timeval now; ProxyQuery *q = func->remote_sql; - const char *sql; ProxyConfig *cf = &func->cur_cluster->config; int binary_result = 0; - StringInfoData buf; gettimeofday(&now, NULL); conn->query_time = now.tv_sec; - /* change state to tag conn unclean */ - conn->state = C_QUERY_WRITE; + tune_connection(func, conn); + if (conn->tuning) + return; /* use binary result only on same backend ver */ if (cf->disable_binary == 0 && conn->same_ver) @@ -146,20 +204,8 @@ send_query(ProxyFunction *func, ProxyConnection *conn, } } - /* prepared sql, no buffer */ - sql = q->sql; - buf.data = NULL; - - /* add statement_timeout to query */ - if (cf->statement_timeout >= 0) - { - initStringInfo(&buf); - appendStringInfo(&buf, "SET statement_timeout=%d; %s", - cf->statement_timeout, q->sql); - sql = buf.data; - } - /* send query */ + conn->state = C_QUERY_WRITE; res = PQsendQueryParams(conn->db, q->sql, q->arg_count, NULL, /* paramTypes */ values, /* paramValues */ @@ -170,19 +216,7 @@ send_query(ProxyFunction *func, ProxyConnection *conn, conn_error(func, conn, "PQsendQueryParams"); /* flush it down */ - res = PQflush(conn->db); - - /* set actual state */ - if (res > 0) - conn->state = C_QUERY_WRITE; - else if (res == 0) - conn->state = C_QUERY_READ; - else - conn_error(func, conn, "PQflush"); - - /* if buffer, free it */ - if (buf.data) - pfree(buf.data); + flush_connection(func, conn); } /* returns false of conn should be dropped */ @@ -304,7 +338,10 @@ another_result(ProxyFunction *func, ProxyConnection *conn) res = PQgetResult(conn->db); if (res == NULL) { - conn->state = C_DONE; + if (conn->tuning) + conn->state = C_READY; + else + conn->state = C_DONE; return false; } @@ -351,7 +388,6 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn) break; case PGRES_POLLING_OK: conn->state = C_READY; - check_new_connection(conn); break; case PGRES_POLLING_ACTIVE: case PGRES_POLLING_FAILED: @@ -359,13 +395,7 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn) } break; case C_QUERY_WRITE: - res = PQflush(conn->db); - if (res > 0) - conn->state = C_QUERY_WRITE; - else if (res == 0) - conn->state = C_QUERY_READ; - else - conn_error(func, conn, "PQflush"); + flush_connection(func, conn); break; case C_QUERY_READ: res = PQconsumeInput(conn->db); |
