Create separate structure for current connection
authorMarko Kreen <markokr@gmail.com>
Tue, 27 Mar 2012 10:49:47 +0000 (13:49 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 29 Mar 2012 11:05:06 +0000 (14:05 +0300)
src/cluster.c
src/execute.c
src/main.c
src/plproxy.h

index e823defbfad8bb4c8597ee2f66a5fd65657c5bb9..c91592d3de72e5b23e5edca1e06cb8936f2f0f25 100644 (file)
@@ -146,6 +146,16 @@ plproxy_cluster_plan_init(void)
        init_done = 1;
 }
 
+static void free_state(ProxyConnectionState *st)
+{
+       if (!st)
+               return;
+       if (st->db)
+               PQfinish(st->db);
+       memset(st, 0, sizeof(*st));
+       pfree(st);
+}
+
 /*
  * Drop partition and connection data from cluster.
  */
@@ -158,12 +168,12 @@ free_connlist(ProxyCluster *cluster)
        for (i = 0; i < cluster->conn_count; i++)
        {
                conn = &cluster->conn_list[i];
-               if (conn->db)
-                       PQfinish(conn->db);
                if (conn->res)
                        PQclear(conn->res);
                if (conn->connstr)
                        pfree((void *) conn->connstr);
+               free_state(conn->cur);
+               conn->cur = NULL;
        }
        pfree(cluster->part_map);
        pfree(cluster->conn_list);
@@ -208,9 +218,11 @@ add_connection(ProxyCluster *cluster, char *connstr, int part_num)
        /* add new connection */
        if (!conn)
        {
-               conn = &cluster->conn_list[cluster->conn_count++];
+               conn = &cluster->conn_list[cluster->conn_count];
                conn->connstr = MemoryContextStrdup(cluster_mem, final->data);
                conn->cluster = cluster;
+               conn->cur = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnectionState));
+               cluster->conn_count++;
        }
 
        cluster->part_map[part_num] = conn;
@@ -843,7 +855,9 @@ fake_cluster(ProxyFunction *func, const char *connect_str)
        cluster->part_map[0] = conn;
 
        conn->connstr = pstrdup(cluster->name);
-       conn->state = C_NONE;
+
+       conn->cur = palloc0(sizeof(ProxyConnectionState));
+       conn->cur->state = C_NONE;
 
        MemoryContextSwitchTo(old_ctx);
 
@@ -937,6 +951,7 @@ static void
 clean_cluster(ProxyCluster *cluster, struct timeval * now)
 {
        ProxyConnection *conn;
+       ProxyConnectionState *cur;
        ProxyConfig *cf = &cluster->config;
        time_t          age;
        int                     i;
@@ -950,11 +965,13 @@ clean_cluster(ProxyCluster *cluster, struct timeval * now)
                        PQclear(conn->res);
                        conn->res = NULL;
                }
-               if (!conn->db)
+
+               cur = conn->cur;
+               if (!cur->db)
                        continue;
 
                drop = false;
-               if (PQstatus(conn->db) != CONNECTION_OK)
+               if (PQstatus(cur->db) != CONNECTION_OK)
                {
                        drop = true;
                }
@@ -964,16 +981,16 @@ clean_cluster(ProxyCluster *cluster, struct timeval * now)
                }
                else
                {
-                       age = now->tv_sec - conn->connect_time;
+                       age = now->tv_sec - cur->connect_time;
                        if (age >= cf->connection_lifetime)
                                drop = true;
                }
 
                if (drop)
                {
-                       PQfinish(conn->db);
-                       conn->db = NULL;
-                       conn->state = C_NONE;
+                       PQfinish(cur->db);
+                       cur->db = NULL;
+                       cur->state = C_NONE;
                }
        }
 }
index 41ceec7a23ad0ed830634d4f7ac8fa960b8d6792..5eced5868e5e27cd142ebb7aff46a00c6621012f 100644 (file)
@@ -67,7 +67,7 @@ static void
 conn_error(ProxyFunction *func, ProxyConnection *conn, const char *desc)
 {
        plproxy_error(func, "[%s] %s: %s",
-                                 PQdb(conn->db), desc, PQerrorMessage(conn->db));
+                                 PQdb(conn->cur->db), desc, PQerrorMessage(conn->cur->db));
 }
 
 /* Compare if major/minor match. Works on "MAJ.MIN.*" */
@@ -102,13 +102,13 @@ flush_connection(ProxyFunction *func, ProxyConnection *conn)
        int res;
 
        /* flush it down */
-       res = PQflush(conn->db);
+       res = PQflush(conn->cur->db);
 
        /* set actual state */
        if (res > 0)
-               conn->state = C_QUERY_WRITE;
+               conn->cur->state = C_QUERY_WRITE;
        else if (res == 0)
-               conn->state = C_QUERY_READ;
+               conn->cur->state = C_QUERY_READ;
        else
                conn_error(func, conn, "PQflush");
 }
@@ -130,14 +130,14 @@ tune_connection(ProxyFunction *func, ProxyConnection *conn)
        /*
         * check if target server has same backend version.
         */
-       dst_ver = PQparameterStatus(conn->db, "server_version");
-       conn->same_ver = cmp_branch(dst_ver, PG_VERSION);
+       dst_ver = PQparameterStatus(conn->cur->db, "server_version");
+       conn->cur->same_ver = cmp_branch(dst_ver, PG_VERSION);
 
        /*
         * Make sure remote I/O is done using local server_encoding.
         */
        this_enc = GetDatabaseEncodingName();
-       dst_enc = PQparameterStatus(conn->db, "client_encoding");
+       dst_enc = PQparameterStatus(conn->cur->db, "client_encoding");
        if (dst_enc && strcmp(this_enc, dst_enc))
        {
                if (!sql)
@@ -148,7 +148,7 @@ tune_connection(ProxyFunction *func, ProxyConnection *conn)
        /*
         * if second time in this function, they should be active already.
         */
-       if (sql && conn->tuning)
+       if (sql && conn->cur->tuning)
        {
                /* display SET query */
                appendStringInfo(sql, "-- does not seem to apply");
@@ -160,9 +160,9 @@ tune_connection(ProxyFunction *func, ProxyConnection *conn)
         */
        if (sql)
        {
-               conn->tuning = 1;
-               conn->state = C_QUERY_WRITE;
-               if (!PQsendQuery(conn->db, sql->data))
+               conn->cur->tuning = 1;
+               conn->cur->state = C_QUERY_WRITE;
+               if (!PQsendQuery(conn->cur->db, sql->data))
                        conn_error(func, conn, "PQsendQuery");
                pfree(sql->data);
                pfree(sql);
@@ -171,7 +171,7 @@ tune_connection(ProxyFunction *func, ProxyConnection *conn)
                return 1;
        }
 
-       conn->tuning = 0;
+       conn->cur->tuning = 0;
        return 0;
 }
 
@@ -187,14 +187,14 @@ send_query(ProxyFunction *func, ProxyConnection *conn,
        int                     binary_result = 0;
 
        gettimeofday(&now, NULL);
-       conn->query_time = now.tv_sec;
+       conn->cur->query_time = now.tv_sec;
 
        tune_connection(func, conn);
-       if (conn->tuning)
+       if (conn->cur->tuning)
                return;
 
        /* use binary result only on same backend ver */
-       if (cf->disable_binary == 0 && conn->same_ver)
+       if (cf->disable_binary == 0 && conn->cur->same_ver)
        {
                /* binary recv for non-record types */
                if (func->ret_scalar)
@@ -210,8 +210,8 @@ send_query(ProxyFunction *func, ProxyConnection *conn,
        }
 
        /* send query */
-       conn->state = C_QUERY_WRITE;
-       res = PQsendQueryParams(conn->db, q->sql, q->arg_count,
+       conn->cur->state = C_QUERY_WRITE;
+       res = PQsendQueryParams(conn->cur->db, q->sql, q->arg_count,
                                                        NULL,           /* paramTypes */
                                                        values,         /* paramValues */
                                                        plengths,       /* paramLengths */
@@ -233,19 +233,19 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
        struct pollfd   pfd;
        ProxyConfig *cf = &func->cur_cluster->config;
 
-       if (PQstatus(conn->db) != CONNECTION_OK)
+       if (PQstatus(conn->cur->db) != CONNECTION_OK)
                return false;
 
        /* check if too old */
        if (cf->connection_lifetime > 0)
        {
-               t = now->tv_sec - conn->connect_time;
+               t = now->tv_sec - conn->cur->connect_time;
                if (t >= cf->connection_lifetime)
                        return false;
        }
 
        /* how long ts been idle */
-       t = now->tv_sec - conn->query_time;
+       t = now->tv_sec - conn->cur->query_time;
        if (t < PLPROXY_IDLE_CONN_CHECK)
                return true;
 
@@ -254,7 +254,7 @@ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
         * are events pending.  If there are drop the connection.
         */
 intr_loop:
-       pfd.fd = PQsocket(conn->db);
+       pfd.fd = PQsocket(conn->cur->db);
        pfd.events = POLLIN;
        pfd.revents = 0;
 
@@ -352,7 +352,7 @@ static void setup_keepalive(ProxyConnection *conn)
 {
        struct sockaddr sa;
        socklen_t salen = sizeof(sa);
-       int fd = PQsocket(conn->db);
+       int fd = PQsocket(conn->cur->db);
        ProxyConfig *config = &conn->cluster->config;
 
        /* turn on keepalive */
@@ -384,10 +384,10 @@ prepare_conn(ProxyFunction *func, ProxyConnection *conn)
        gettimeofday(&now, NULL);
 
        /* state should be C_READY or C_NONE */
-       switch (conn->state)
+       switch (conn->cur->state)
        {
                case C_DONE:
-                       conn->state = C_READY;
+                       conn->cur->state = C_READY;
                case C_READY:
                        if (check_old_conn(func, conn, &now))
                                return;
@@ -398,29 +398,29 @@ prepare_conn(ProxyFunction *func, ProxyConnection *conn)
                case C_QUERY_WRITE:
                        /* close rotten connection */
                        elog(NOTICE, "PL/Proxy: dropping stale conn");
-                       PQfinish(conn->db);
-                       conn->db = NULL;
-                       conn->state = C_NONE;
-                       conn->tuning = 0;
+                       PQfinish(conn->cur->db);
+                       conn->cur->db = NULL;
+                       conn->cur->state = C_NONE;
+                       conn->cur->tuning = 0;
                case C_NONE:
                        break;
        }
 
-       conn->connect_time = now.tv_sec;
+       conn->cur->connect_time = now.tv_sec;
 
        /* launch new connection */
-       conn->db = PQconnectStart(conn->connstr);
-       if (conn->db == NULL)
+       conn->cur->db = PQconnectStart(conn->connstr);
+       if (conn->cur->db == NULL)
                plproxy_error(func, "No memory for PGconn");
 
        /* tag connection dirty */
-       conn->state = C_CONNECT_WRITE;
+       conn->cur->state = C_CONNECT_WRITE;
 
-       if (PQstatus(conn->db) == CONNECTION_BAD)
+       if (PQstatus(conn->cur->db) == CONNECTION_BAD)
                conn_error(func, conn, "PQconnectStart");
 
        /* override default notice handler */
-       PQsetNoticeReceiver(conn->db, handle_notice, conn);
+       PQsetNoticeReceiver(conn->cur->db, handle_notice, conn);
 
        setup_keepalive(conn);
 }
@@ -437,13 +437,13 @@ another_result(ProxyFunction *func, ProxyConnection *conn)
        PGresult   *res;
 
        /* got one */
-       res = PQgetResult(conn->db);
+       res = PQgetResult(conn->cur->db);
        if (res == NULL)
        {
-               if (conn->tuning)
-                       conn->state = C_READY;
+               if (conn->cur->tuning)
+                       conn->cur->state = C_READY;
                else
-                       conn->state = C_DONE;
+                       conn->cur->state = C_DONE;
                return false;
        }
 
@@ -489,21 +489,21 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn)
        int                     res;
        PostgresPollingStatusType poll_res;
 
-       switch (conn->state)
+       switch (conn->cur->state)
        {
                case C_CONNECT_READ:
                case C_CONNECT_WRITE:
-                       poll_res = PQconnectPoll(conn->db);
+                       poll_res = PQconnectPoll(conn->cur->db);
                        switch (poll_res)
                        {
                                case PGRES_POLLING_WRITING:
-                                       conn->state = C_CONNECT_WRITE;
+                                       conn->cur->state = C_CONNECT_WRITE;
                                        break;
                                case PGRES_POLLING_READING:
-                                       conn->state = C_CONNECT_READ;
+                                       conn->cur->state = C_CONNECT_READ;
                                        break;
                                case PGRES_POLLING_OK:
-                                       conn->state = C_READY;
+                                       conn->cur->state = C_READY;
                                        break;
                                case PGRES_POLLING_ACTIVE:
                                case PGRES_POLLING_FAILED:
@@ -514,7 +514,7 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn)
                        flush_connection(func, conn);
                        break;
                case C_QUERY_READ:
-                       res = PQconsumeInput(conn->db);
+                       res = PQconsumeInput(conn->cur->db);
                        if (res == 0)
                                conn_error(func, conn, "PQconsumeInput");
 
@@ -522,7 +522,7 @@ handle_conn(ProxyFunction *func, ProxyConnection *conn)
                        while (1)
                        {
                                /* if PQisBusy, then incomplete result */
-                               if (PQisBusy(conn->db))
+                               if (PQisBusy(conn->cur->db))
                                        break;
 
                                /* got one */
@@ -579,7 +579,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
                        continue;
 
                /* decide what to do */
-               switch (conn->state)
+               switch (conn->cur->state)
                {
                        case C_DONE:
                        case C_READY:
@@ -597,7 +597,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
 
                /* add fd to proper set */
                pf = pfd_cache + numfds++;
-               pf->fd = PQsocket(conn->db);
+               pf->fd = PQsocket(conn->cur->db);
                pf->events = ev;
                pf->revents = 0;
        }
@@ -621,7 +621,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
                if (!conn->run_tag)
                        continue;
 
-               switch (conn->state)
+               switch (conn->cur->state)
                {
                        case C_DONE:
                        case C_READY:
@@ -637,7 +637,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
                /*
                 * they should be in same order as called,
                 */
-               fd = PQsocket(conn->db);
+               fd = PQsocket(conn->cur->db);
                if (pf->fd != fd)
                        elog(WARNING, "fd order from poll() is messed up?");
 
@@ -655,13 +655,13 @@ check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn
 {
        ProxyConfig *cf = &cluster->config;
 
-       switch (conn->state)
+       switch (conn->cur->state)
        {
                case C_CONNECT_READ:
                case C_CONNECT_WRITE:
                        if (cf->connect_timeout <= 0)
                                break;
-                       if (now - conn->connect_time <= cf->connect_timeout)
+                       if (now - conn->cur->connect_time <= cf->connect_timeout)
                                break;
                        plproxy_error(func, "connect timeout to: %s", conn->connstr);
                        break;
@@ -670,7 +670,7 @@ check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn
                case C_QUERY_WRITE:
                        if (cf->query_timeout <= 0)
                                break;
-                       if (now - conn->query_time <= cf->query_timeout)
+                       if (now - conn->cur->query_time <= cf->query_timeout)
                                break;
                        plproxy_error(func, "query timeout");
                        break;
@@ -702,7 +702,7 @@ remote_execute(ProxyFunction *func)
                pending++;
 
                /* if conn is ready, then send query away */
-               if (conn->state == C_READY)
+               if (conn->cur->state == C_READY)
                        send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
        }
 
@@ -726,10 +726,10 @@ remote_execute(ProxyFunction *func)
                                continue;
 
                        /* login finished, send query */
-                       if (conn->state == C_READY)
+                       if (conn->cur->state == C_READY)
                                send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
 
-                       if (conn->state != C_DONE)
+                       if (conn->cur->state != C_DONE)
                                pending++;
 
                        check_timeouts(func, cluster, conn, now.tv_sec);
@@ -748,7 +748,7 @@ remote_execute(ProxyFunction *func)
                if (!conn->run_tag)
                        continue;
 
-               if (conn->state != C_DONE)
+               if (conn->cur->state != C_DONE)
                        plproxy_error(func, "Unfinished connection");
                if (conn->res == NULL)
                        plproxy_error(func, "Lost result");
@@ -778,12 +778,12 @@ remote_cancel(ProxyFunction *func)
        for (i = 0; i < cluster->conn_count; i++)
        {
                conn = &cluster->conn_list[i];
-               if (conn->state == C_NONE ||
-                       conn->state == C_READY ||
-                       conn->state == C_DONE)
+               if (conn->cur->state == C_NONE ||
+                       conn->cur->state == C_READY ||
+                       conn->cur->state == C_DONE)
                        continue;
 
-               cancel = PQgetCancel(conn->db);
+               cancel = PQgetCancel(conn->cur->db);
                if (cancel == NULL)
                {
                        elog(NOTICE, "Invalid connection!");
index cdfdeff0f4ffee15cea0cd4e84448d98d766d574..861557ddb525ad7953b8725b6270ba49343c43d4 100644 (file)
@@ -109,7 +109,7 @@ plproxy_remote_error(ProxyFunction *func, ProxyConnection *conn, const PGresult
 
        ereport(elevel, (
                errcode(MAKE_SQLSTATE(ss[0], ss[1], ss[2], ss[3], ss[4])),
-               errmsg("%s(%d): [%s] REMOTE %s: %s", func->name, func->arg_count, PQdb(conn->db), sev, msg),
+               errmsg("%s(%d): [%s] REMOTE %s: %s", func->name, func->arg_count, PQdb(conn->cur->db), sev, msg),
                det ? errdetail("Remote detail: %s", det) : 0,
                hint ? errhint("Remote hint: %s", hint) : 0,
                spos ? errposition(atoi(spos)) : 0,
index c82fb11ae364d3e33cd4ed7f03d60bd4f7767e0a..dd193c3fd7775effe175ca31c72d58ce32ae086c 100644 (file)
@@ -135,21 +135,25 @@ typedef struct ProxyConfig
        int                     keepcnt;
 } ProxyConfig;
 
+typedef struct ProxyConnectionState {
+       PGconn     *db;                         /* libpq connection handle */
+       ConnState       state;                  /* Connection state */
+       time_t          connect_time;   /* When connection was started */
+       time_t          query_time;             /* When last query was sent */
+       bool            same_ver;               /* True if dest backend has same X.Y ver */
+       bool            tuning;                 /* True if tuning query is running on conn */
+} ProxyConnectionState;
+
 /* Single database connection */
-typedef struct
+typedef struct ProxyConnection
 {
        struct ProxyCluster *cluster;
        const char *connstr;            /* Connection string for libpq */
 
        /* state */
-       PGconn     *db;                         /* libpq connection handle */
        PGresult   *res;                        /* last resultset */
        int                     pos;                    /* Current position inside res */
-       ConnState       state;                  /* Connection state */
-       time_t          connect_time;   /* When connection was started */
-       time_t          query_time;             /* When last query was sent */
-       bool            same_ver;               /* True if dest backend has same X.Y ver */
-       bool            tuning;                 /* True if tuning query is running on conn */
+       ProxyConnectionState *cur;
 
        /*
         * Nonzero if this connection should be used. The actual tag value is only