From 4d6fc59d18a50123b2d72fdf8e45b135a00a958a Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Tue, 27 Mar 2012 13:49:47 +0300 Subject: [PATCH] Create separate structure for current connection --- src/cluster.c | 37 +++++++++++----- src/execute.c | 120 +++++++++++++++++++++++++------------------------- src/main.c | 2 +- src/plproxy.h | 18 +++++--- 4 files changed, 99 insertions(+), 78 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e823def..c91592d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -146,6 +146,16 @@ plproxy_cluster_plan_init(void) init_done = 1; } +static void free_state(ProxyConnectionState *st) +{ + if (!st) + return; + if (st->db) + PQfinish(st->db); + memset(st, 0, sizeof(*st)); + pfree(st); +} + /* * Drop partition and connection data from cluster. */ @@ -158,12 +168,12 @@ free_connlist(ProxyCluster *cluster) for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (conn->db) - PQfinish(conn->db); if (conn->res) PQclear(conn->res); if (conn->connstr) pfree((void *) conn->connstr); + free_state(conn->cur); + conn->cur = NULL; } pfree(cluster->part_map); pfree(cluster->conn_list); @@ -208,9 +218,11 @@ add_connection(ProxyCluster *cluster, char *connstr, int part_num) /* add new connection */ if (!conn) { - conn = &cluster->conn_list[cluster->conn_count++]; + conn = &cluster->conn_list[cluster->conn_count]; conn->connstr = MemoryContextStrdup(cluster_mem, final->data); conn->cluster = cluster; + conn->cur = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnectionState)); + cluster->conn_count++; } cluster->part_map[part_num] = conn; @@ -843,7 +855,9 @@ fake_cluster(ProxyFunction *func, const char *connect_str) cluster->part_map[0] = conn; conn->connstr = pstrdup(cluster->name); - conn->state = C_NONE; + + conn->cur = palloc0(sizeof(ProxyConnectionState)); + conn->cur->state = C_NONE; MemoryContextSwitchTo(old_ctx); @@ -937,6 +951,7 @@ static void clean_cluster(ProxyCluster *cluster, struct timeval * now) { ProxyConnection *conn; + ProxyConnectionState *cur; ProxyConfig *cf = &cluster->config; time_t age; int i; @@ -950,11 +965,13 @@ clean_cluster(ProxyCluster *cluster, struct timeval * now) PQclear(conn->res); conn->res = NULL; } - if (!conn->db) + + cur = conn->cur; + if (!cur->db) continue; drop = false; - if (PQstatus(conn->db) != CONNECTION_OK) + if (PQstatus(cur->db) != CONNECTION_OK) { drop = true; } @@ -964,16 +981,16 @@ clean_cluster(ProxyCluster *cluster, struct timeval * now) } else { - age = now->tv_sec - conn->connect_time; + age = now->tv_sec - cur->connect_time; if (age >= cf->connection_lifetime) drop = true; } if (drop) { - PQfinish(conn->db); - conn->db = NULL; - conn->state = C_NONE; + PQfinish(cur->db); + cur->db = NULL; + cur->state = C_NONE; } } } diff --git a/src/execute.c b/src/execute.c index 41ceec7..5eced58 100644 --- a/src/execute.c +++ b/src/execute.c @@ -67,7 +67,7 @@ static void conn_error(ProxyFunction *func, ProxyConnection *conn, const char *desc) { plproxy_error(func, "[%s] %s: %s", - PQdb(conn->db), desc, PQerrorMessage(conn->db)); + PQdb(conn->cur->db), desc, PQerrorMessage(conn->cur->db)); } /* Compare if major/minor match. Works on "MAJ.MIN.*" */ @@ -102,13 +102,13 @@ flush_connection(ProxyFunction *func, ProxyConnection *conn) int res; /* flush it down */ - res = PQflush(conn->db); + res = PQflush(conn->cur->db); /* set actual state */ if (res > 0) - conn->state = C_QUERY_WRITE; + conn->cur->state = C_QUERY_WRITE; else if (res == 0) - conn->state = C_QUERY_READ; + conn->cur->state = C_QUERY_READ; else conn_error(func, conn, "PQflush"); } @@ -130,14 +130,14 @@ tune_connection(ProxyFunction *func, ProxyConnection *conn) /* * check if target server has same backend version. */ - dst_ver = PQparameterStatus(conn->db, "server_version"); - conn->same_ver = cmp_branch(dst_ver, PG_VERSION); + dst_ver = PQparameterStatus(conn->cur->db, "server_version"); + conn->cur->same_ver = cmp_branch(dst_ver, PG_VERSION); /* * Make sure remote I/O is done using local server_encoding. */ this_enc = GetDatabaseEncodingName(); - dst_enc = PQparameterStatus(conn->db, "client_encoding"); + dst_enc = PQparameterStatus(conn->cur->db, "client_encoding"); if (dst_enc && strcmp(this_enc, dst_enc)) { if (!sql) @@ -148,7 +148,7 @@ tune_connection(ProxyFunction *func, ProxyConnection *conn) /* * if second time in this function, they should be active already. */ - if (sql && conn->tuning) + if (sql && conn->cur->tuning) { /* display SET query */ appendStringInfo(sql, "-- does not seem to apply"); @@ -160,9 +160,9 @@ tune_connection(ProxyFunction *func, ProxyConnection *conn) */ if (sql) { - conn->tuning = 1; - conn->state = C_QUERY_WRITE; - if (!PQsendQuery(conn->db, sql->data)) + conn->cur->tuning = 1; + conn->cur->state = C_QUERY_WRITE; + if (!PQsendQuery(conn->cur->db, sql->data)) conn_error(func, conn, "PQsendQuery"); pfree(sql->data); pfree(sql); @@ -171,7 +171,7 @@ tune_connection(ProxyFunction *func, ProxyConnection *conn) return 1; } - conn->tuning = 0; + conn->cur->tuning = 0; return 0; } @@ -187,14 +187,14 @@ send_query(ProxyFunction *func, ProxyConnection *conn, int binary_result = 0; gettimeofday(&now, NULL); - conn->query_time = now.tv_sec; + conn->cur->query_time = now.tv_sec; tune_connection(func, conn); - if (conn->tuning) + if (conn->cur->tuning) return; /* use binary result only on same backend ver */ - if (cf->disable_binary == 0 && conn->same_ver) + if (cf->disable_binary == 0 && conn->cur->same_ver) { /* binary recv for non-record types */ if (func->ret_scalar) @@ -210,8 +210,8 @@ send_query(ProxyFunction *func, ProxyConnection *conn, } /* send query */ - conn->state = C_QUERY_WRITE; - res = PQsendQueryParams(conn->db, q->sql, q->arg_count, + conn->cur->state = C_QUERY_WRITE; + res = PQsendQueryParams(conn->cur->db, q->sql, q->arg_count, NULL, /* paramTypes */ values, /* paramValues */ plengths, /* paramLengths */ @@ -233,19 +233,19 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now) struct pollfd pfd; ProxyConfig *cf = &func->cur_cluster->config; - if (PQstatus(conn->db) != CONNECTION_OK) + if (PQstatus(conn->cur->db) != CONNECTION_OK) return false; /* check if too old */ if (cf->connection_lifetime > 0) { - t = now->tv_sec - conn->connect_time; + t = now->tv_sec - conn->cur->connect_time; if (t >= cf->connection_lifetime) return false; } /* how long ts been idle */ - t = now->tv_sec - conn->query_time; + t = now->tv_sec - conn->cur->query_time; if (t < PLPROXY_IDLE_CONN_CHECK) return true; @@ -254,7 +254,7 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now) * are events pending. If there are drop the connection. */ intr_loop: - pfd.fd = PQsocket(conn->db); + pfd.fd = PQsocket(conn->cur->db); pfd.events = POLLIN; pfd.revents = 0; @@ -352,7 +352,7 @@ static void setup_keepalive(ProxyConnection *conn) { struct sockaddr sa; socklen_t salen = sizeof(sa); - int fd = PQsocket(conn->db); + int fd = PQsocket(conn->cur->db); ProxyConfig *config = &conn->cluster->config; /* turn on keepalive */ @@ -384,10 +384,10 @@ prepare_conn(ProxyFunction *func, ProxyConnection *conn) gettimeofday(&now, NULL); /* state should be C_READY or C_NONE */ - switch (conn->state) + switch (conn->cur->state) { case C_DONE: - conn->state = C_READY; + conn->cur->state = C_READY; case C_READY: if (check_old_conn(func, conn, &now)) return; @@ -398,29 +398,29 @@ prepare_conn(ProxyFunction *func, ProxyConnection *conn) case C_QUERY_WRITE: /* close rotten connection */ elog(NOTICE, "PL/Proxy: dropping stale conn"); - PQfinish(conn->db); - conn->db = NULL; - conn->state = C_NONE; - conn->tuning = 0; + PQfinish(conn->cur->db); + conn->cur->db = NULL; + conn->cur->state = C_NONE; + conn->cur->tuning = 0; case C_NONE: break; } - conn->connect_time = now.tv_sec; + conn->cur->connect_time = now.tv_sec; /* launch new connection */ - conn->db = PQconnectStart(conn->connstr); - if (conn->db == NULL) + conn->cur->db = PQconnectStart(conn->connstr); + if (conn->cur->db == NULL) plproxy_error(func, "No memory for PGconn"); /* tag connection dirty */ - conn->state = C_CONNECT_WRITE; + conn->cur->state = C_CONNECT_WRITE; - if (PQstatus(conn->db) == CONNECTION_BAD) + if (PQstatus(conn->cur->db) == CONNECTION_BAD) conn_error(func, conn, "PQconnectStart"); /* override default notice handler */ - PQsetNoticeReceiver(conn->db, handle_notice, conn); + PQsetNoticeReceiver(conn->cur->db, handle_notice, conn); setup_keepalive(conn); } @@ -437,13 +437,13 @@ another_result(ProxyFunction *func, ProxyConnection *conn) PGresult *res; /* got one */ - res = PQgetResult(conn->db); + res = PQgetResult(conn->cur->db); if (res == NULL) { - if (conn->tuning) - conn->state = C_READY; + if (conn->cur->tuning) + conn->cur->state = C_READY; else - conn->state = C_DONE; + conn->cur->state = C_DONE; return false; } @@ -489,21 +489,21 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn) int res; PostgresPollingStatusType poll_res; - switch (conn->state) + switch (conn->cur->state) { case C_CONNECT_READ: case C_CONNECT_WRITE: - poll_res = PQconnectPoll(conn->db); + poll_res = PQconnectPoll(conn->cur->db); switch (poll_res) { case PGRES_POLLING_WRITING: - conn->state = C_CONNECT_WRITE; + conn->cur->state = C_CONNECT_WRITE; break; case PGRES_POLLING_READING: - conn->state = C_CONNECT_READ; + conn->cur->state = C_CONNECT_READ; break; case PGRES_POLLING_OK: - conn->state = C_READY; + conn->cur->state = C_READY; break; case PGRES_POLLING_ACTIVE: case PGRES_POLLING_FAILED: @@ -514,7 +514,7 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn) flush_connection(func, conn); break; case C_QUERY_READ: - res = PQconsumeInput(conn->db); + res = PQconsumeInput(conn->cur->db); if (res == 0) conn_error(func, conn, "PQconsumeInput"); @@ -522,7 +522,7 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn) while (1) { /* if PQisBusy, then incomplete result */ - if (PQisBusy(conn->db)) + if (PQisBusy(conn->cur->db)) break; /* got one */ @@ -579,7 +579,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) continue; /* decide what to do */ - switch (conn->state) + switch (conn->cur->state) { case C_DONE: case C_READY: @@ -597,7 +597,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) /* add fd to proper set */ pf = pfd_cache + numfds++; - pf->fd = PQsocket(conn->db); + pf->fd = PQsocket(conn->cur->db); pf->events = ev; pf->revents = 0; } @@ -621,7 +621,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) if (!conn->run_tag) continue; - switch (conn->state) + switch (conn->cur->state) { case C_DONE: case C_READY: @@ -637,7 +637,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) /* * they should be in same order as called, */ - fd = PQsocket(conn->db); + fd = PQsocket(conn->cur->db); if (pf->fd != fd) elog(WARNING, "fd order from poll() is messed up?"); @@ -655,13 +655,13 @@ check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn { ProxyConfig *cf = &cluster->config; - switch (conn->state) + switch (conn->cur->state) { case C_CONNECT_READ: case C_CONNECT_WRITE: if (cf->connect_timeout <= 0) break; - if (now - conn->connect_time <= cf->connect_timeout) + if (now - conn->cur->connect_time <= cf->connect_timeout) break; plproxy_error(func, "connect timeout to: %s", conn->connstr); break; @@ -670,7 +670,7 @@ check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn case C_QUERY_WRITE: if (cf->query_timeout <= 0) break; - if (now - conn->query_time <= cf->query_timeout) + if (now - conn->cur->query_time <= cf->query_timeout) break; plproxy_error(func, "query timeout"); break; @@ -702,7 +702,7 @@ remote_execute(ProxyFunction *func) pending++; /* if conn is ready, then send query away */ - if (conn->state == C_READY) + if (conn->cur->state == C_READY) send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats); } @@ -726,10 +726,10 @@ remote_execute(ProxyFunction *func) continue; /* login finished, send query */ - if (conn->state == C_READY) + if (conn->cur->state == C_READY) send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats); - if (conn->state != C_DONE) + if (conn->cur->state != C_DONE) pending++; check_timeouts(func, cluster, conn, now.tv_sec); @@ -748,7 +748,7 @@ remote_execute(ProxyFunction *func) if (!conn->run_tag) continue; - if (conn->state != C_DONE) + if (conn->cur->state != C_DONE) plproxy_error(func, "Unfinished connection"); if (conn->res == NULL) plproxy_error(func, "Lost result"); @@ -778,12 +778,12 @@ remote_cancel(ProxyFunction *func) for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (conn->state == C_NONE || - conn->state == C_READY || - conn->state == C_DONE) + if (conn->cur->state == C_NONE || + conn->cur->state == C_READY || + conn->cur->state == C_DONE) continue; - cancel = PQgetCancel(conn->db); + cancel = PQgetCancel(conn->cur->db); if (cancel == NULL) { elog(NOTICE, "Invalid connection!"); diff --git a/src/main.c b/src/main.c index cdfdeff..861557d 100644 --- a/src/main.c +++ b/src/main.c @@ -109,7 +109,7 @@ plproxy_remote_error(ProxyFunction *func, ProxyConnection *conn, const PGresult ereport(elevel, ( errcode(MAKE_SQLSTATE(ss[0], ss[1], ss[2], ss[3], ss[4])), - errmsg("%s(%d): [%s] REMOTE %s: %s", func->name, func->arg_count, PQdb(conn->db), sev, msg), + errmsg("%s(%d): [%s] REMOTE %s: %s", func->name, func->arg_count, PQdb(conn->cur->db), sev, msg), det ? errdetail("Remote detail: %s", det) : 0, hint ? errhint("Remote hint: %s", hint) : 0, spos ? errposition(atoi(spos)) : 0, diff --git a/src/plproxy.h b/src/plproxy.h index c82fb11..dd193c3 100644 --- a/src/plproxy.h +++ b/src/plproxy.h @@ -135,21 +135,25 @@ typedef struct ProxyConfig int keepcnt; } ProxyConfig; +typedef struct ProxyConnectionState { + PGconn *db; /* libpq connection handle */ + ConnState state; /* Connection state */ + time_t connect_time; /* When connection was started */ + time_t query_time; /* When last query was sent */ + bool same_ver; /* True if dest backend has same X.Y ver */ + bool tuning; /* True if tuning query is running on conn */ +} ProxyConnectionState; + /* Single database connection */ -typedef struct +typedef struct ProxyConnection { struct ProxyCluster *cluster; const char *connstr; /* Connection string for libpq */ /* state */ - PGconn *db; /* libpq connection handle */ PGresult *res; /* last resultset */ int pos; /* Current position inside res */ - ConnState state; /* Connection state */ - time_t connect_time; /* When connection was started */ - time_t query_time; /* When last query was sent */ - bool same_ver; /* True if dest backend has same X.Y ver */ - bool tuning; /* True if tuning query is running on conn */ + ProxyConnectionState *cur; /* * Nonzero if this connection should be used. The actual tag value is only -- 2.39.5