User mapping now almost works on sql/med too
authorMarko Kreen <markokr@gmail.com>
Wed, 28 Mar 2012 19:12:15 +0000 (22:12 +0300)
committerMarko Kreen <markokr@gmail.com>
Thu, 29 Mar 2012 11:07:32 +0000 (14:07 +0300)
src/cluster.c
src/execute.c
src/plproxy.h

index 5519dcf5e5578c8c222e4d828ac82e89b7f3aa39..20737c31e9a470541f6c717fe60cb8f01f195c58 100644 (file)
@@ -142,8 +142,12 @@ static int userinfo_cmp(uintptr_t val, struct AANode *node)
 static void userinfo_free(struct AANode *node, void *arg)
 {
        ConnUserInfo *info = (ConnUserInfo *)node;
-       pfree((void*)info->username);
-       pfree((void*)info->connstr);
+       pfree(info->username);
+       if (info->extra_connstr)
+       {
+               memset(info->extra_connstr, 0, strlen(info->extra_connstr));
+               pfree(info->extra_connstr);
+       }
        memset(info, 0, sizeof(*info));
        pfree(info);
 }
@@ -215,7 +219,6 @@ static void
 free_connlist(ProxyCluster *cluster)
 {
        aatree_destroy(&cluster->conn_tree);
-       aatree_destroy(&cluster->userinfo_tree);
 
        pfree(cluster->part_map);
        pfree(cluster->active_list);
@@ -535,6 +538,66 @@ plproxy_fdw_validator(PG_FUNCTION_ARGS)
        PG_RETURN_BOOL(true);
 }
 
+static void
+reload_sqlmed_user(ProxyFunction *func, ProxyCluster *cluster)
+{
+       ConnUserInfo *userinfo = cluster->cur_userinfo;
+
+       UserMapping        *um;
+       HeapTuple                       tup;
+       StringInfoData      cstr;
+       ListCell                   *cell;
+       AclResult                       aclresult;
+
+
+       um = GetUserMapping(userinfo->user_oid, cluster->sqlmed_server_oid);
+
+       /* retry same lookup so we can set cache stamp... */
+    tup = SearchSysCache(USERMAPPINGUSERSERVER,
+                                                ObjectIdGetDatum(um->userid),
+                                                ObjectIdGetDatum(um->serverid),
+                                                0, 0);
+    if (!HeapTupleIsValid(tup))
+       {
+               /* Specific mapping not found, try PUBLIC */
+               tup = SearchSysCache(USERMAPPINGUSERSERVER,
+                                                        ObjectIdGetDatum(InvalidOid),
+                                                        ObjectIdGetDatum(um->serverid),
+                                                        0, 0);
+               if (!HeapTupleIsValid(tup))
+                       elog(ERROR, "cache lookup failed for user mapping (%u,%u)",
+                               um->userid, um->serverid);
+       }
+       scstamp_set(USERMAPPINGOID, &userinfo->umStamp, tup);
+       ReleaseSysCache(tup);
+
+       /*
+        * Check permissions, user must have usage on the server.
+        */
+       aclresult = pg_foreign_server_aclcheck(um->serverid, um->userid, ACL_USAGE);
+       if (aclresult != ACLCHECK_OK)
+               aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, cluster->name);
+
+       /* Extract the common connect string elements from user mapping */
+       initStringInfo(&cstr);
+       foreach(cell, um->options)
+       {
+               DefElem    *def = lfirst(cell);
+
+               appendStringInfo(&cstr, " %s='%s'", def->defname, strVal(def->arg));
+       }
+
+       if (userinfo->extra_connstr)
+       {
+               memset(userinfo->extra_connstr, 0, strlen(userinfo->extra_connstr));
+               pfree(userinfo->extra_connstr);
+               userinfo->extra_connstr = NULL;
+       }
+
+       userinfo->extra_connstr = MemoryContextStrdup(cluster_mem, cstr.data);
+       memset(cstr.data, 0, cstr.len);
+}
+
 /*
  * Reload the cluster configuration and partitions from SQL/MED catalogs.
  */
@@ -542,19 +605,15 @@ static void
 reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster,
                                          ForeignServer *foreign_server)
 {
-       UserMapping        *user_mapping;
+       ConnUserInfo            *info = cluster->cur_userinfo;
        ForeignDataWrapper *fdw;
        HeapTuple                       tup;
        AclResult                       aclresult;
-       StringInfo          user_options;
        ListCell                   *cell;
-       Oid                                     userid;
        int                                     part_count = 0;
        int                                     part_num;
 
 
-       userid = GetSessionUserId();
-       user_mapping = GetUserMapping(userid, foreign_server->serverid);
        fdw = GetForeignDataWrapper(foreign_server->fdwid);
 
        /*
@@ -570,43 +629,13 @@ reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster,
        scstamp_set(FOREIGNSERVEROID, &cluster->clusterStamp, tup);
        ReleaseSysCache(tup);
 
-    tup = SearchSysCache(USERMAPPINGUSERSERVER,
-                                                ObjectIdGetDatum(user_mapping->userid),
-                                                ObjectIdGetDatum(foreign_server->serverid),
-                                                0, 0);
-
-    if (!HeapTupleIsValid(tup))
-       {
-               /* Specific mapping not found, try PUBLIC */
-               tup = SearchSysCache(USERMAPPINGUSERSERVER,
-                                                        ObjectIdGetDatum(InvalidOid),
-                                                        ObjectIdGetDatum(foreign_server->serverid),
-                                                        0, 0);
-               if (!HeapTupleIsValid(tup))
-                       elog(ERROR, "cache lookup failed for user mapping (%u,%u)",
-                               user_mapping->userid, foreign_server->serverid);
-       }
-
-       scstamp_set(USERMAPPINGOID, &cluster->umStamp, tup);
-
-       ReleaseSysCache(tup);
-
        /*
         * Check permissions, user must have usage on the server.
         */
-       aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, userid, ACL_USAGE);
+       aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, info->user_oid, ACL_USAGE);
        if (aclresult != ACLCHECK_OK)
                aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
 
-       /* Extract the common connect string elements from user mapping */
-       user_options = makeStringInfo();
-       foreach(cell, user_mapping->options)
-       {
-               DefElem    *def = lfirst(cell);
-
-               appendStringInfo(user_options, "%s='%s' ", def->defname, strVal(def->arg));
-       }
-
        /*
         * Collect the configuration definitions from foreign data wrapper.
         */
@@ -649,15 +678,11 @@ reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster,
        foreach(cell, foreign_server->options)
        {
                DefElem    *def = lfirst(cell);
-               StringInfo  buf = makeStringInfo();
 
                if (!extract_part_num(def->defname, &part_num))
                        continue;
 
-               appendStringInfo(buf, "%s%s%s", strVal(def->arg),
-                               user_options->len ? " " : "",
-                               user_options->data);
-               add_connection(cluster, buf->data, part_num);
+               add_connection(cluster, strVal(def->arg), part_num);
        }
 }
 
@@ -713,6 +738,34 @@ determine_compat_mode(ProxyCluster *cluster)
                elog(ERROR, "Pl/Proxy: cluster not found: %s", cluster->name);
 }
 
+static void inval_one_umap(struct AANode *n, void *arg)
+{
+       ConnUserInfo *info = (ConnUserInfo *)n;
+       SCInvalArg newStamp;
+
+       if (info->needs_reload)
+               /* already invalidated */
+               return;
+
+       if (arg == NULL)
+       {
+               info->needs_reload = true;
+               return;
+       }
+
+       newStamp = *(SCInvalArg *)arg;
+       if (scstamp_check(USERMAPPINGOID, &info->umStamp, newStamp))
+               /* user mappings changed */
+               info->needs_reload = true;
+}
+
+static void inval_umapping(struct AANode *n, void *arg)
+{
+       ProxyCluster *cluster = (ProxyCluster *)n;
+
+       aatree_walk(&cluster->userinfo_tree, AA_WALK_IN_ORDER, inval_one_umap, arg);
+}
+
 static void inval_fserver(struct AANode *n, void *arg)
 {
        ProxyCluster *cluster = (ProxyCluster *)n;
@@ -727,20 +780,10 @@ static void inval_fserver(struct AANode *n, void *arg)
        else if (scstamp_check(FOREIGNSERVEROID, &cluster->clusterStamp, newStamp))
                /* server definitions changed */
                cluster->needs_reload = true;
-}
-
-static void inval_one_umap(struct AANode *n, void *arg)
-static void inval_umapping(struct AANode *n, void *arg)
-{
-       ProxyCluster *cluster = (ProxyCluster *)n;
-       SCInvalArg newStamp = *(SCInvalArg *)arg;
 
+       /* tag all users too */
        if (cluster->needs_reload)
-               /* already invalidated */
-               return;
-       else if (scstamp_check(USERMAPPINGOID, &cluster->umStamp, newStamp))
-               /* user mappings changed */
-               cluster->needs_reload = true;
+               inval_umapping(&cluster->node, NULL);
 }
 
 /*
@@ -759,20 +802,24 @@ ClusterSyscacheCallback(Datum arg, int cacheid, SCInvalArg newStamp)
                aatree_walk(&cluster_tree, AA_WALK_IN_ORDER, inval_umapping, &newStamp);
 }
 
-#endif
-
 /*
  * Register syscache invalidation callbacks for SQL/MED clusters.
  */
 void
 plproxy_syscache_callback_init(void)
 {
-#ifdef PLPROXY_USE_SQLMED
        CacheRegisterSyscacheCallback(FOREIGNSERVEROID, ClusterSyscacheCallback, (Datum) 0);
        CacheRegisterSyscacheCallback(USERMAPPINGOID, ClusterSyscacheCallback, (Datum) 0);
-#endif
 }
 
+#else /* !PLPROXY_USE_SQLMED */
+
+void plproxy_syscache_callback_init(void) {}
+
+#endif
+
+
+
 /*
  * Reload the cluster configuration and partitions from plproxy.get_cluster*
  * functions.
@@ -817,24 +864,74 @@ new_cluster(const char *name)
        return cluster;
 }
 
-static void init_cluster_user(ProxyCluster *cluster, const char *username)
+/*
+ * Invalidate all connections for particular user
+ */
+
+static void drop_userinfo_state(struct AANode *node, void *arg)
+{
+       ProxyConnectionState *cur = (ProxyConnectionState *)node;
+       ConnUserInfo *userinfo = arg;
+
+       if (cur->userinfo == userinfo && cur->db)
+       {
+               PQfinish(cur->db);
+               cur->db = NULL;
+               cur->state = C_NONE;
+       }
+}
+
+static void drop_userinfo_conn(struct AANode *node, void *arg)
+{
+       ProxyConnection *conn = (ProxyConnection *)node;
+       ConnUserInfo *userinfo = arg;
+
+       aatree_walk(&conn->userstate_tree, AA_WALK_IN_ORDER, drop_userinfo_state, userinfo);
+}
+
+static void inval_user_connections(ProxyCluster *cluster, ConnUserInfo *userinfo)
+{
+       /* find all connections with this user and drop them */
+       aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, drop_userinfo_conn, userinfo);
+
+       /*
+        * We can clear the flag only when it's certain
+        * that no connections with old info exist
+        */
+       userinfo->needs_reload = false;
+}
+
+/*
+ * Initialize user info struct
+ */
+
+static ConnUserInfo *
+get_userinfo(ProxyCluster *cluster, Oid user_oid)
 {
        ConnUserInfo *userinfo;
-       StringInfo tmp;
        struct AANode *node;
+       const char *username;
+
+       username = GetUserNameFromId(user_oid);
 
        node = aatree_search(&cluster->userinfo_tree, (uintptr_t)username);
        if (node) {
                userinfo = (ConnUserInfo *)node;
        } else {
-               tmp = makeStringInfo();
-               appendStringInfo(tmp, "user=%s", username);
                userinfo = MemoryContextAllocZero(cluster_mem, sizeof(*userinfo));
                userinfo->username = MemoryContextStrdup(cluster_mem, username);
-               userinfo->connstr = MemoryContextStrdup(cluster_mem, tmp->data);
+
+               aatree_insert(&cluster->userinfo_tree, (uintptr_t)username, &userinfo->node);
        }
 
-       cluster->cur_userinfo = userinfo;
+       if (userinfo->user_oid != user_oid)
+       {
+               /* user got renamed? */
+               userinfo->user_oid = user_oid;
+               userinfo->needs_reload = true;
+       }
+
+       return userinfo;
 }
 
 /*
@@ -843,11 +940,13 @@ static void init_cluster_user(ProxyCluster *cluster, const char *username)
 static void
 refresh_cluster(ProxyFunction *func, ProxyCluster *cluster)
 {
-       const char *username;
+       ConnUserInfo *uinfo;
        Oid user_oid;
 
        user_oid = GetSessionUserId();
-       username = GetUserNameFromId(user_oid);
+
+       uinfo = get_userinfo(cluster, user_oid);
+       cluster->cur_userinfo = uinfo;
 
 #ifdef PLPROXY_USE_SQLMED
        if (cluster->needs_reload)
@@ -865,18 +964,32 @@ refresh_cluster(ProxyFunction *func, ProxyCluster *cluster)
                if (!cluster->sqlmed_cluster)
                        determine_compat_mode(cluster);
                else
+               {
+                       cluster->sqlmed_server_oid = server->serverid;
                        reload_sqlmed_cluster(func, cluster, server);
+               }
        }
+
 #endif
 
+       if (uinfo->needs_reload)
+       {
+#ifdef PLPROXY_USE_SQLMED
+               if (cluster->sqlmed_cluster)
+               {
+                       inval_user_connections(cluster, uinfo);
+                       reload_sqlmed_user(func, cluster);
+               }
+               else
+#endif
+                       uinfo->needs_reload = false;
+       }
+
        /* Either no SQL/MED support or no such foreign server */
        if (!cluster->sqlmed_cluster && !cluster->fake_cluster)
                reload_plproxy_cluster(func, cluster);
 
-       init_cluster_user(cluster, username);
-
        cluster->needs_reload = false;
-       pfree((void*)username);
 }
 
 /*
@@ -1040,6 +1153,7 @@ struct MaintInfo {
 static void clean_state(struct AANode *node, void *arg)
 {
        ProxyConnectionState *cur = (ProxyConnectionState *)node;
+       ConnUserInfo *uinfo = cur->userinfo;
        struct MaintInfo *maint = arg;
        ProxyConfig *cf = maint->cf;
        struct timeval *now = maint->now;
@@ -1054,6 +1168,10 @@ static void clean_state(struct AANode *node, void *arg)
        {
                drop = true;
        }
+       else if (uinfo->needs_reload)
+       {
+               drop = true;
+       }
        else if (cf->connection_lifetime <= 0)
        {
                /* no aging */
index 320536fca4f8fa8efdd7749433609f571695dc49..6b546c29b4206c2dc8d52fbf77a9addc13df8f1a 100644 (file)
@@ -385,7 +385,10 @@ get_connstr(ProxyConnection *conn)
                return pstrdup(conn->connstr);
 
        initStringInfo(&cstr);
-       appendStringInfo(&cstr, "%s %s", conn->connstr, info->connstr);
+       if (info->extra_connstr)
+               appendStringInfo(&cstr, "%s %s", conn->connstr, info->extra_connstr);
+       else
+               appendStringInfo(&cstr, "%s user='%s'", conn->connstr, info->username);
        return cstr.data;
 }
 
index a2c7e95d03b9301005a809bd20fbbbf148594e67..f58c9cf6db31d1b67d46d140fc9a765fb88aa9b0 100644 (file)
@@ -138,9 +138,10 @@ typedef struct ProxyConfig
 
 typedef struct ConnUserInfo {
        struct AANode node;
+       Oid user_oid;
 
-       const char *username;
-       const char *connstr;
+       char *username;
+       char *extra_connstr;            /* user= and password= */
 
        SysCacheStamp umStamp;
        bool needs_reload;
@@ -217,6 +218,8 @@ typedef struct ProxyCluster
        int                     ret_cur_pos;    /* Result walking: index of current row */
        int                     ret_total;              /* Result walking: total rows left */
 
+       Oid                     sqlmed_server_oid;
+
        bool            fake_cluster;   /* single connect-string cluster */
        bool            sqlmed_cluster; /* True if the cluster is defined using SQL/MED */
        bool            needs_reload;   /* True if the cluster partition list should be reloaded */