Per-user mapping for non-sqlmed clusters
authorMarko Kreen <markokr@gmail.com>
Wed, 28 Mar 2012 10:47:30 +0000 (13:47 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 29 Mar 2012 11:07:31 +0000 (14:07 +0300)
src/cluster.c
src/execute.c
src/plproxy.h

index 257d855f96633c8d8b3851fc6b3633b39b232b07..5519dcf5e5578c8c222e4d828ac82e89b7f3aa39 100644 (file)
@@ -108,14 +108,46 @@ static void conn_free(struct AANode *node, void *arg)
 {
        ProxyConnection *conn = (ProxyConnection *)node;
 
+       aatree_destroy(&conn->userstate_tree);
        if (conn->res)
                PQclear(conn->res);
-       if (conn->cur->db)
-               PQfinish(conn->cur->db);
-       pfree(conn->cur);
        pfree(conn);
 }
 
+static int state_user_cmp(uintptr_t val, struct AANode *node)
+{
+       const char *name = (const char *)val;
+       const ProxyConnectionState *state = (ProxyConnectionState *)node;
+
+       return strcmp(name, state->userinfo->username);
+}
+
+static void state_free(struct AANode *node, void *arg)
+{
+       ProxyConnectionState *state = (ProxyConnectionState *)node;
+       if (state->db)
+               PQfinish(state->db);
+       memset(state, 0, sizeof(*state));
+       pfree(state);
+}
+
+static int userinfo_cmp(uintptr_t val, struct AANode *node)
+{
+       const char *name = (const char *)val;
+       const ConnUserInfo *info = (ConnUserInfo *)node;
+
+       return strcmp(name, info->username);
+}
+
+static void userinfo_free(struct AANode *node, void *arg)
+{
+       ConnUserInfo *info = (ConnUserInfo *)node;
+       pfree((void*)info->username);
+       pfree((void*)info->connstr);
+       memset(info, 0, sizeof(*info));
+       pfree(info);
+}
+
 /*
  * Create cache memory area and prepare plans
  */
@@ -178,10 +210,12 @@ plproxy_cluster_plan_init(void)
 /*
  * Drop partition and connection data from cluster.
  */
+
 static void
 free_connlist(ProxyCluster *cluster)
 {
        aatree_destroy(&cluster->conn_tree);
+       aatree_destroy(&cluster->userinfo_tree);
 
        pfree(cluster->part_map);
        pfree(cluster->active_list);
@@ -195,24 +229,11 @@ free_connlist(ProxyCluster *cluster)
 /*
  * Add new database connection if it does not exists.
  */
-static ProxyConnection *
-add_connection(ProxyCluster *cluster, char *connstr, int part_num)
+static void
+add_connection(ProxyCluster *cluster, const char *connstr, int part_num)
 {
        struct AANode *node;
        ProxyConnection *conn = NULL;
-       char       *username;
-       StringInfo      final;
-
-       final = makeStringInfo();
-       appendStringInfoString(final, connstr);
-
-       /* append current user if not specified in connstr */
-       if (strstr(connstr, "user=") == NULL)
-       {
-               username = GetUserNameFromId(GetSessionUserId());
-               appendStringInfo(final, " user=%s", username);
-       }
-       connstr = final->data;
 
        /* check if already have it */
        node = aatree_search(&cluster->conn_tree, (uintptr_t)connstr);
@@ -225,14 +246,13 @@ add_connection(ProxyCluster *cluster, char *connstr, int part_num)
                conn = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnection));
                conn->connstr = MemoryContextStrdup(cluster_mem, connstr);
                conn->cluster = cluster;
-               conn->cur = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnectionState));
+
+               aatree_init(&conn->userstate_tree, state_user_cmp, state_free);
 
                aatree_insert(&cluster->conn_tree, (uintptr_t)connstr, &conn->node);
        }
 
        cluster->part_map[part_num] = conn;
-
-       return conn;
 }
 
 /*
@@ -709,6 +729,7 @@ static void inval_fserver(struct AANode *n, void *arg)
                cluster->needs_reload = true;
 }
 
+static void inval_one_umap(struct AANode *n, void *arg)
 static void inval_umapping(struct AANode *n, void *arg)
 {
        ProxyCluster *cluster = (ProxyCluster *)n;
@@ -787,21 +808,47 @@ new_cluster(const char *name)
 
        cluster = palloc0(sizeof(*cluster));
        cluster->name = pstrdup(name);
-       cluster->needs_reload = true;
 
        aatree_init(&cluster->conn_tree, conn_cstr_cmp, conn_free);
+       aatree_init(&cluster->userinfo_tree, userinfo_cmp, userinfo_free);
 
        MemoryContextSwitchTo(old_ctx);
 
        return cluster;
 }
 
+static void init_cluster_user(ProxyCluster *cluster, const char *username)
+{
+       ConnUserInfo *userinfo;
+       StringInfo tmp;
+       struct AANode *node;
+
+       node = aatree_search(&cluster->userinfo_tree, (uintptr_t)username);
+       if (node) {
+               userinfo = (ConnUserInfo *)node;
+       } else {
+               tmp = makeStringInfo();
+               appendStringInfo(tmp, "user=%s", username);
+               userinfo = MemoryContextAllocZero(cluster_mem, sizeof(*userinfo));
+               userinfo->username = MemoryContextStrdup(cluster_mem, username);
+               userinfo->connstr = MemoryContextStrdup(cluster_mem, tmp->data);
+       }
+
+       cluster->cur_userinfo = userinfo;
+}
+
 /*
  * Refresh the cluster.
  */
 static void
 refresh_cluster(ProxyFunction *func, ProxyCluster *cluster)
 {
+       const char *username;
+       Oid user_oid;
+
+       user_oid = GetSessionUserId();
+       username = GetUserNameFromId(user_oid);
+
 #ifdef PLPROXY_USE_SQLMED
        if (cluster->needs_reload)
        {
@@ -823,10 +870,13 @@ refresh_cluster(ProxyFunction *func, ProxyCluster *cluster)
 #endif
 
        /* Either no SQL/MED support or no such foreign server */
-       if (!cluster->sqlmed_cluster)
+       if (!cluster->sqlmed_cluster && !cluster->fake_cluster)
                reload_plproxy_cluster(func, cluster);
 
+       init_cluster_user(cluster, username);
+
        cluster->needs_reload = false;
+       pfree((void*)username);
 }
 
 /*
@@ -836,40 +886,37 @@ static ProxyCluster *
 fake_cluster(ProxyFunction *func, const char *connect_str)
 {
        ProxyCluster *cluster;
-       ProxyConnection *conn;
        MemoryContext old_ctx;
        struct AANode *n;
 
        /* search if cached */
        n = aatree_search(&fake_cluster_tree, (uintptr_t)connect_str);
        if (n)
-               return (ProxyCluster *)n;
+       {
+               cluster = (ProxyCluster *)n;
+               goto done;
+       }
 
        /* create if not */
        cluster = new_cluster(connect_str);
 
        old_ctx = MemoryContextSwitchTo(cluster_mem);
 
-       cluster->needs_reload = 0;
+       cluster->fake_cluster = true;
        cluster->version = 1;
        cluster->part_count = 1;
        cluster->part_mask = 0;
        cluster->part_map = palloc(cluster->part_count * sizeof(ProxyConnection *));
        cluster->active_list = palloc(cluster->part_count * sizeof(ProxyConnection *));
 
-       conn = palloc0(sizeof(ProxyConnection));
-       conn->cluster = cluster;
-       conn->connstr = pstrdup(cluster->name);
-       conn->cur = palloc0(sizeof(ProxyConnectionState));
-       conn->cur->state = C_NONE;
-
-       aatree_insert(&cluster->conn_tree, (uintptr_t)conn->connstr, &conn->node);
-       cluster->part_map[0] = conn;
-
        MemoryContextSwitchTo(old_ctx);
 
+       add_connection(cluster, connect_str, 0);
+
        aatree_insert(&fake_cluster_tree, (uintptr_t)connect_str, &cluster->node);
 
+done:
+       refresh_cluster(func, cluster);
        return cluster;
 }
 
@@ -942,6 +989,7 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo)
        if (!cluster)
        {
                cluster = new_cluster(name);
+               cluster->needs_reload = true;
                aatree_insert(&cluster_tree, (uintptr_t)name, &cluster->node);
        }
 
@@ -951,26 +999,53 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo)
        return cluster;
 }
 
+/*
+ * Move connection to active list and init current
+ * connection state.
+ */
+void plproxy_activate_connection(struct ProxyConnection *conn)
+{
+       ProxyCluster *cluster = conn->cluster;
+       ConnUserInfo *userinfo = cluster->cur_userinfo;
+       const char *username = userinfo->username;
+       struct AANode *node;
+       ProxyConnectionState *cur;
+
+       /* move connection to active_list */
+       cluster->active_list[cluster->active_count] = conn;
+       cluster->active_count++;
+
+       /* fill ->cur pointer */
+
+       node = aatree_search(&conn->userstate_tree, (uintptr_t)username);
+       if (node) {
+               cur = (ProxyConnectionState *)node;
+       } else {
+               cur = MemoryContextAlloc(cluster_mem, sizeof(*cur));
+               cur->userinfo = userinfo;
+               aatree_insert(&conn->userstate_tree, (uintptr_t)username, &cur->node);
+       }
+       conn->cur = cur;
+}
+
 /*
  * Clean old connections and results from all clusters.
  */
 
-static void clean_conn(struct AANode *node, void *arg)
+struct MaintInfo {
+       struct ProxyConfig *cf;
+       struct timeval *now;
+};
+
+static void clean_state(struct AANode *node, void *arg)
 {
-       ProxyConnection *conn = (ProxyConnection *)node;
-       ProxyConnectionState *cur;
-       ProxyConfig *cf = &conn->cluster->config;
-       struct timeval *now = arg;
+       ProxyConnectionState *cur = (ProxyConnectionState *)node;
+       struct MaintInfo *maint = arg;
+       ProxyConfig *cf = maint->cf;
+       struct timeval *now = maint->now;
        time_t          age;
        bool            drop;
 
-       if (conn->res)
-       {
-               PQclear(conn->res);
-               conn->res = NULL;
-       }
-
-       cur = conn->cur;
        if (!cur->db)
                return;
 
@@ -998,12 +1073,29 @@ static void clean_conn(struct AANode *node, void *arg)
        }
 }
 
+static void clean_conn(struct AANode *node, void *arg)
+{
+       ProxyConnection *conn = (ProxyConnection *)node;
+       struct MaintInfo *maint = arg;
+
+       if (conn->res)
+       {
+               PQclear(conn->res);
+               conn->res = NULL;
+       }
+
+       aatree_walk(&conn->userstate_tree, AA_WALK_IN_ORDER, clean_state, maint);
+}
+
 static void clean_cluster(struct AANode *n, void *arg)
 {
        ProxyCluster *cluster = (ProxyCluster *)n;
-       struct timeval *now = arg;
+       struct MaintInfo maint;
 
-       aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, clean_conn, now);
+       maint.cf = &cluster->config;
+       maint.now = arg;
+
+       aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, clean_conn, &maint);
 }
 
 void
@@ -1012,3 +1104,4 @@ plproxy_cluster_maint(struct timeval * now)
        aatree_walk(&cluster_tree, AA_WALK_IN_ORDER, clean_cluster, now);
        aatree_walk(&fake_cluster_tree, AA_WALK_IN_ORDER, clean_cluster, now);
 }
+
index e332a09d65fdacb2f143181a7b0102160a88cc46..320536fca4f8fa8efdd7749433609f571695dc49 100644 (file)
@@ -375,11 +375,26 @@ handle_notice(void *arg, const PGresult *res)
        plproxy_remote_error(cluster->cur_func, conn, res, false);
 }
 
+static const char *
+get_connstr(ProxyConnection *conn)
+{
+       StringInfoData cstr;
+       ConnUserInfo *info = conn->cluster->cur_userinfo;
+
+       if (strstr(conn->connstr, "user=") != NULL)
+               return pstrdup(conn->connstr);
+
+       initStringInfo(&cstr);
+       appendStringInfo(&cstr, "%s %s", conn->connstr, info->connstr);
+       return cstr.data;
+}
+
 /* check existing conn status or launch new conn */
 static void
 prepare_conn(ProxyFunction *func, ProxyConnection *conn)
 {
        struct timeval now;
+       const char *connstr;
 
        gettimeofday(&now, NULL);
 
@@ -409,7 +424,8 @@ prepare_conn(ProxyFunction *func, ProxyConnection *conn)
        conn->cur->connect_time = now.tv_sec;
 
        /* launch new connection */
-       conn->cur->db = PQconnectStart(conn->connstr);
+       connstr = get_connstr(conn);
+       conn->cur->db = PQconnectStart(connstr);
        if (conn->cur->db == NULL)
                plproxy_error(func, "No memory for PGconn");
 
@@ -805,10 +821,8 @@ static void tag_part(struct ProxyCluster *cluster, int i, int tag)
        ProxyConnection *conn = cluster->part_map[i];
 
        if (!conn->run_tag)
-       {
-               cluster->active_list[cluster->active_count] = conn;
-               cluster->active_count++;
-       }
+               plproxy_activate_connection(conn);
+
        conn->run_tag = tag;
 }
 
@@ -1130,6 +1144,8 @@ plproxy_clean_results(ProxyCluster *cluster)
                conn->pos = 0;
                conn->run_tag = 0;
                conn->bstate = NULL;
+               conn->cur = NULL;
+               cluster->active_list[i] = NULL;
        }
 
        /* reset active_list */
index f9e4573d5a13507bb0618f6576eb5ff900332ff8..a2c7e95d03b9301005a809bd20fbbbf148594e67 100644 (file)
@@ -136,7 +136,21 @@ typedef struct ProxyConfig
        int                     keepcnt;
 } ProxyConfig;
 
+typedef struct ConnUserInfo {
+       struct AANode node;
+
+       const char *username;
+       const char *connstr;
+
+       SysCacheStamp umStamp;
+       bool needs_reload;
+} ConnUserInfo;
+
 typedef struct ProxyConnectionState {
+       struct AANode node;                     /* node head in user->state tree */
+
+       ConnUserInfo *userinfo;
+
        PGconn     *db;                         /* libpq connection handle */
        ConnState       state;                  /* Connection state */
        time_t          connect_time;   /* When connection was started */
@@ -153,6 +167,8 @@ typedef struct ProxyConnection
        struct ProxyCluster *cluster;
        const char *connstr;            /* Connection string for libpq */
 
+       struct AATree userstate_tree; /* user->state tree */
+
        /* state */
        PGresult   *res;                        /* last resultset */
        int                     pos;                    /* Current position inside res */
@@ -194,10 +210,14 @@ typedef struct ProxyCluster
 
        struct AATree conn_tree;        /* connstr -> ProxyConnection */
 
+       struct AATree userinfo_tree; /* username->userinfo tree */
+       ConnUserInfo *cur_userinfo;     /* userinfo struct for current request */
+
        int                     ret_cur_conn;   /* Result walking: index of current conn */
        int                     ret_cur_pos;    /* Result walking: index of current row */
        int                     ret_total;              /* Result walking: total rows left */
 
+       bool            fake_cluster;   /* single connect-string cluster */
        bool            sqlmed_cluster; /* True if the cluster is defined using SQL/MED */
        bool            needs_reload;   /* True if the cluster partition list should be reloaded */
        bool            busy;                   /* True if the cluster is already involved in execution */
@@ -207,7 +227,6 @@ typedef struct ProxyCluster
         * Used in to perform cluster invalidation in syscache callbacks.
         */
        SysCacheStamp clusterStamp;
-       SysCacheStamp umStamp;
 
        /* notice processing: provide info about currently executing function */
        struct ProxyFunction    *cur_func;
@@ -398,6 +417,7 @@ void                plproxy_cluster_cache_init(void);
 void           plproxy_syscache_callback_init(void);
 ProxyCluster *plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo);
 void           plproxy_cluster_maint(struct timeval * now);
+void           plproxy_activate_connection(struct ProxyConnection *conn);
 
 /* result.c */
 Datum          plproxy_result(ProxyFunction *func, FunctionCallInfo fcinfo);