static void userinfo_free(struct AANode *node, void *arg)
{
ConnUserInfo *info = (ConnUserInfo *)node;
- pfree((void*)info->username);
- pfree((void*)info->connstr);
+ pfree(info->username);
+ if (info->extra_connstr)
+ {
+ memset(info->extra_connstr, 0, strlen(info->extra_connstr));
+ pfree(info->extra_connstr);
+ }
memset(info, 0, sizeof(*info));
pfree(info);
}
free_connlist(ProxyCluster *cluster)
{
aatree_destroy(&cluster->conn_tree);
- aatree_destroy(&cluster->userinfo_tree);
pfree(cluster->part_map);
pfree(cluster->active_list);
PG_RETURN_BOOL(true);
}
+static void
+reload_sqlmed_user(ProxyFunction *func, ProxyCluster *cluster)
+{
+ ConnUserInfo *userinfo = cluster->cur_userinfo;
+
+ UserMapping *um;
+ HeapTuple tup;
+ StringInfoData cstr;
+ ListCell *cell;
+ AclResult aclresult;
+
+
+ um = GetUserMapping(userinfo->user_oid, cluster->sqlmed_server_oid);
+
+ /* retry same lookup so we can set cache stamp... */
+ tup = SearchSysCache(USERMAPPINGUSERSERVER,
+ ObjectIdGetDatum(um->userid),
+ ObjectIdGetDatum(um->serverid),
+ 0, 0);
+ if (!HeapTupleIsValid(tup))
+ {
+ /* Specific mapping not found, try PUBLIC */
+ tup = SearchSysCache(USERMAPPINGUSERSERVER,
+ ObjectIdGetDatum(InvalidOid),
+ ObjectIdGetDatum(um->serverid),
+ 0, 0);
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for user mapping (%u,%u)",
+ um->userid, um->serverid);
+ }
+ scstamp_set(USERMAPPINGOID, &userinfo->umStamp, tup);
+ ReleaseSysCache(tup);
+
+ /*
+ * Check permissions, user must have usage on the server.
+ */
+ aclresult = pg_foreign_server_aclcheck(um->serverid, um->userid, ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, cluster->name);
+
+ /* Extract the common connect string elements from user mapping */
+ initStringInfo(&cstr);
+ foreach(cell, um->options)
+ {
+ DefElem *def = lfirst(cell);
+
+ appendStringInfo(&cstr, " %s='%s'", def->defname, strVal(def->arg));
+ }
+
+ if (userinfo->extra_connstr)
+ {
+ memset(userinfo->extra_connstr, 0, strlen(userinfo->extra_connstr));
+ pfree(userinfo->extra_connstr);
+ userinfo->extra_connstr = NULL;
+ }
+
+ userinfo->extra_connstr = MemoryContextStrdup(cluster_mem, cstr.data);
+ memset(cstr.data, 0, cstr.len);
+}
+
/*
* Reload the cluster configuration and partitions from SQL/MED catalogs.
*/
reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster,
ForeignServer *foreign_server)
{
- UserMapping *user_mapping;
+ ConnUserInfo *info = cluster->cur_userinfo;
ForeignDataWrapper *fdw;
HeapTuple tup;
AclResult aclresult;
- StringInfo user_options;
ListCell *cell;
- Oid userid;
int part_count = 0;
int part_num;
- userid = GetSessionUserId();
- user_mapping = GetUserMapping(userid, foreign_server->serverid);
fdw = GetForeignDataWrapper(foreign_server->fdwid);
/*
scstamp_set(FOREIGNSERVEROID, &cluster->clusterStamp, tup);
ReleaseSysCache(tup);
- tup = SearchSysCache(USERMAPPINGUSERSERVER,
- ObjectIdGetDatum(user_mapping->userid),
- ObjectIdGetDatum(foreign_server->serverid),
- 0, 0);
-
- if (!HeapTupleIsValid(tup))
- {
- /* Specific mapping not found, try PUBLIC */
- tup = SearchSysCache(USERMAPPINGUSERSERVER,
- ObjectIdGetDatum(InvalidOid),
- ObjectIdGetDatum(foreign_server->serverid),
- 0, 0);
- if (!HeapTupleIsValid(tup))
- elog(ERROR, "cache lookup failed for user mapping (%u,%u)",
- user_mapping->userid, foreign_server->serverid);
- }
-
- scstamp_set(USERMAPPINGOID, &cluster->umStamp, tup);
-
- ReleaseSysCache(tup);
-
/*
* Check permissions, user must have usage on the server.
*/
- aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, userid, ACL_USAGE);
+ aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, info->user_oid, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
- /* Extract the common connect string elements from user mapping */
- user_options = makeStringInfo();
- foreach(cell, user_mapping->options)
- {
- DefElem *def = lfirst(cell);
-
- appendStringInfo(user_options, "%s='%s' ", def->defname, strVal(def->arg));
- }
-
/*
* Collect the configuration definitions from foreign data wrapper.
*/
foreach(cell, foreign_server->options)
{
DefElem *def = lfirst(cell);
- StringInfo buf = makeStringInfo();
if (!extract_part_num(def->defname, &part_num))
continue;
- appendStringInfo(buf, "%s%s%s", strVal(def->arg),
- user_options->len ? " " : "",
- user_options->data);
- add_connection(cluster, buf->data, part_num);
+ add_connection(cluster, strVal(def->arg), part_num);
}
}
elog(ERROR, "Pl/Proxy: cluster not found: %s", cluster->name);
}
+static void inval_one_umap(struct AANode *n, void *arg)
+{
+ ConnUserInfo *info = (ConnUserInfo *)n;
+ SCInvalArg newStamp;
+
+ if (info->needs_reload)
+ /* already invalidated */
+ return;
+
+ if (arg == NULL)
+ {
+ info->needs_reload = true;
+ return;
+ }
+
+ newStamp = *(SCInvalArg *)arg;
+ if (scstamp_check(USERMAPPINGOID, &info->umStamp, newStamp))
+ /* user mappings changed */
+ info->needs_reload = true;
+}
+
+static void inval_umapping(struct AANode *n, void *arg)
+{
+ ProxyCluster *cluster = (ProxyCluster *)n;
+
+ aatree_walk(&cluster->userinfo_tree, AA_WALK_IN_ORDER, inval_one_umap, arg);
+}
+
static void inval_fserver(struct AANode *n, void *arg)
{
ProxyCluster *cluster = (ProxyCluster *)n;
else if (scstamp_check(FOREIGNSERVEROID, &cluster->clusterStamp, newStamp))
/* server definitions changed */
cluster->needs_reload = true;
-}
-
-static void inval_one_umap(struct AANode *n, void *arg)
-static void inval_umapping(struct AANode *n, void *arg)
-{
- ProxyCluster *cluster = (ProxyCluster *)n;
- SCInvalArg newStamp = *(SCInvalArg *)arg;
+ /* tag all users too */
if (cluster->needs_reload)
- /* already invalidated */
- return;
- else if (scstamp_check(USERMAPPINGOID, &cluster->umStamp, newStamp))
- /* user mappings changed */
- cluster->needs_reload = true;
+ inval_umapping(&cluster->node, NULL);
}
/*
aatree_walk(&cluster_tree, AA_WALK_IN_ORDER, inval_umapping, &newStamp);
}
-#endif
-
/*
* Register syscache invalidation callbacks for SQL/MED clusters.
*/
void
plproxy_syscache_callback_init(void)
{
-#ifdef PLPROXY_USE_SQLMED
CacheRegisterSyscacheCallback(FOREIGNSERVEROID, ClusterSyscacheCallback, (Datum) 0);
CacheRegisterSyscacheCallback(USERMAPPINGOID, ClusterSyscacheCallback, (Datum) 0);
-#endif
}
+#else /* !PLPROXY_USE_SQLMED */
+
+void plproxy_syscache_callback_init(void) {}
+
+#endif
+
+
+
/*
* Reload the cluster configuration and partitions from plproxy.get_cluster*
* functions.
return cluster;
}
-static void init_cluster_user(ProxyCluster *cluster, const char *username)
+/*
+ * Invalidate all connections for particular user
+ */
+
+static void drop_userinfo_state(struct AANode *node, void *arg)
+{
+ ProxyConnectionState *cur = (ProxyConnectionState *)node;
+ ConnUserInfo *userinfo = arg;
+
+ if (cur->userinfo == userinfo && cur->db)
+ {
+ PQfinish(cur->db);
+ cur->db = NULL;
+ cur->state = C_NONE;
+ }
+}
+
+static void drop_userinfo_conn(struct AANode *node, void *arg)
+{
+ ProxyConnection *conn = (ProxyConnection *)node;
+ ConnUserInfo *userinfo = arg;
+
+ aatree_walk(&conn->userstate_tree, AA_WALK_IN_ORDER, drop_userinfo_state, userinfo);
+}
+
+static void inval_user_connections(ProxyCluster *cluster, ConnUserInfo *userinfo)
+{
+ /* find all connections with this user and drop them */
+ aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, drop_userinfo_conn, userinfo);
+
+ /*
+ * We can clear the flag only when it's certain
+ * that no connections with old info exist
+ */
+ userinfo->needs_reload = false;
+}
+
+/*
+ * Initialize user info struct
+ */
+
+static ConnUserInfo *
+get_userinfo(ProxyCluster *cluster, Oid user_oid)
{
ConnUserInfo *userinfo;
- StringInfo tmp;
struct AANode *node;
+ const char *username;
+
+ username = GetUserNameFromId(user_oid);
node = aatree_search(&cluster->userinfo_tree, (uintptr_t)username);
if (node) {
userinfo = (ConnUserInfo *)node;
} else {
- tmp = makeStringInfo();
- appendStringInfo(tmp, "user=%s", username);
userinfo = MemoryContextAllocZero(cluster_mem, sizeof(*userinfo));
userinfo->username = MemoryContextStrdup(cluster_mem, username);
- userinfo->connstr = MemoryContextStrdup(cluster_mem, tmp->data);
+
+ aatree_insert(&cluster->userinfo_tree, (uintptr_t)username, &userinfo->node);
}
- cluster->cur_userinfo = userinfo;
+ if (userinfo->user_oid != user_oid)
+ {
+ /* user got renamed? */
+ userinfo->user_oid = user_oid;
+ userinfo->needs_reload = true;
+ }
+
+ return userinfo;
}
/*
static void
refresh_cluster(ProxyFunction *func, ProxyCluster *cluster)
{
- const char *username;
+ ConnUserInfo *uinfo;
Oid user_oid;
user_oid = GetSessionUserId();
- username = GetUserNameFromId(user_oid);
+
+ uinfo = get_userinfo(cluster, user_oid);
+ cluster->cur_userinfo = uinfo;
#ifdef PLPROXY_USE_SQLMED
if (cluster->needs_reload)
if (!cluster->sqlmed_cluster)
determine_compat_mode(cluster);
else
+ {
+ cluster->sqlmed_server_oid = server->serverid;
reload_sqlmed_cluster(func, cluster, server);
+ }
}
+
#endif
+ if (uinfo->needs_reload)
+ {
+#ifdef PLPROXY_USE_SQLMED
+ if (cluster->sqlmed_cluster)
+ {
+ inval_user_connections(cluster, uinfo);
+ reload_sqlmed_user(func, cluster);
+ }
+ else
+#endif
+ uinfo->needs_reload = false;
+ }
+
/* Either no SQL/MED support or no such foreign server */
if (!cluster->sqlmed_cluster && !cluster->fake_cluster)
reload_plproxy_cluster(func, cluster);
- init_cluster_user(cluster, username);
-
cluster->needs_reload = false;
- pfree((void*)username);
}
/*
static void clean_state(struct AANode *node, void *arg)
{
ProxyConnectionState *cur = (ProxyConnectionState *)node;
+ ConnUserInfo *uinfo = cur->userinfo;
struct MaintInfo *maint = arg;
ProxyConfig *cf = maint->cf;
struct timeval *now = maint->now;
{
drop = true;
}
+ else if (uinfo->needs_reload)
+ {
+ drop = true;
+ }
else if (cf->connection_lifetime <= 0)
{
/* no aging */