{
ProxyConnection *conn = (ProxyConnection *)node;
+ aatree_destroy(&conn->userstate_tree);
if (conn->res)
PQclear(conn->res);
- if (conn->cur->db)
- PQfinish(conn->cur->db);
- pfree(conn->cur);
pfree(conn);
}
+static int state_user_cmp(uintptr_t val, struct AANode *node)
+{
+ const char *name = (const char *)val;
+ const ProxyConnectionState *state = (ProxyConnectionState *)node;
+
+ return strcmp(name, state->userinfo->username);
+}
+
+static void state_free(struct AANode *node, void *arg)
+{
+ ProxyConnectionState *state = (ProxyConnectionState *)node;
+ if (state->db)
+ PQfinish(state->db);
+ memset(state, 0, sizeof(*state));
+ pfree(state);
+}
+
+static int userinfo_cmp(uintptr_t val, struct AANode *node)
+{
+ const char *name = (const char *)val;
+ const ConnUserInfo *info = (ConnUserInfo *)node;
+
+ return strcmp(name, info->username);
+}
+
+static void userinfo_free(struct AANode *node, void *arg)
+{
+ ConnUserInfo *info = (ConnUserInfo *)node;
+ pfree((void*)info->username);
+ pfree((void*)info->connstr);
+ memset(info, 0, sizeof(*info));
+ pfree(info);
+}
+
/*
* Create cache memory area and prepare plans
*/
/*
* Drop partition and connection data from cluster.
*/
+
static void
free_connlist(ProxyCluster *cluster)
{
aatree_destroy(&cluster->conn_tree);
+ aatree_destroy(&cluster->userinfo_tree);
pfree(cluster->part_map);
pfree(cluster->active_list);
/*
* Add new database connection if it does not exists.
*/
-static ProxyConnection *
-add_connection(ProxyCluster *cluster, char *connstr, int part_num)
+static void
+add_connection(ProxyCluster *cluster, const char *connstr, int part_num)
{
struct AANode *node;
ProxyConnection *conn = NULL;
- char *username;
- StringInfo final;
-
- final = makeStringInfo();
- appendStringInfoString(final, connstr);
-
- /* append current user if not specified in connstr */
- if (strstr(connstr, "user=") == NULL)
- {
- username = GetUserNameFromId(GetSessionUserId());
- appendStringInfo(final, " user=%s", username);
- }
- connstr = final->data;
/* check if already have it */
node = aatree_search(&cluster->conn_tree, (uintptr_t)connstr);
conn = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnection));
conn->connstr = MemoryContextStrdup(cluster_mem, connstr);
conn->cluster = cluster;
- conn->cur = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnectionState));
+
+ aatree_init(&conn->userstate_tree, state_user_cmp, state_free);
aatree_insert(&cluster->conn_tree, (uintptr_t)connstr, &conn->node);
}
cluster->part_map[part_num] = conn;
-
- return conn;
}
/*
cluster->needs_reload = true;
}
+static void inval_one_umap(struct AANode *n, void *arg)
static void inval_umapping(struct AANode *n, void *arg)
{
ProxyCluster *cluster = (ProxyCluster *)n;
cluster = palloc0(sizeof(*cluster));
cluster->name = pstrdup(name);
- cluster->needs_reload = true;
aatree_init(&cluster->conn_tree, conn_cstr_cmp, conn_free);
+ aatree_init(&cluster->userinfo_tree, userinfo_cmp, userinfo_free);
MemoryContextSwitchTo(old_ctx);
return cluster;
}
+static void init_cluster_user(ProxyCluster *cluster, const char *username)
+{
+ ConnUserInfo *userinfo;
+ StringInfo tmp;
+ struct AANode *node;
+
+ node = aatree_search(&cluster->userinfo_tree, (uintptr_t)username);
+ if (node) {
+ userinfo = (ConnUserInfo *)node;
+ } else {
+ tmp = makeStringInfo();
+ appendStringInfo(tmp, "user=%s", username);
+ userinfo = MemoryContextAllocZero(cluster_mem, sizeof(*userinfo));
+ userinfo->username = MemoryContextStrdup(cluster_mem, username);
+ userinfo->connstr = MemoryContextStrdup(cluster_mem, tmp->data);
+ }
+
+ cluster->cur_userinfo = userinfo;
+}
+
/*
* Refresh the cluster.
*/
static void
refresh_cluster(ProxyFunction *func, ProxyCluster *cluster)
{
+ const char *username;
+ Oid user_oid;
+
+ user_oid = GetSessionUserId();
+ username = GetUserNameFromId(user_oid);
+
#ifdef PLPROXY_USE_SQLMED
if (cluster->needs_reload)
{
#endif
/* Either no SQL/MED support or no such foreign server */
- if (!cluster->sqlmed_cluster)
+ if (!cluster->sqlmed_cluster && !cluster->fake_cluster)
reload_plproxy_cluster(func, cluster);
+ init_cluster_user(cluster, username);
+
cluster->needs_reload = false;
+ pfree((void*)username);
}
/*
fake_cluster(ProxyFunction *func, const char *connect_str)
{
ProxyCluster *cluster;
- ProxyConnection *conn;
MemoryContext old_ctx;
struct AANode *n;
/* search if cached */
n = aatree_search(&fake_cluster_tree, (uintptr_t)connect_str);
if (n)
- return (ProxyCluster *)n;
+ {
+ cluster = (ProxyCluster *)n;
+ goto done;
+ }
/* create if not */
cluster = new_cluster(connect_str);
old_ctx = MemoryContextSwitchTo(cluster_mem);
- cluster->needs_reload = 0;
+ cluster->fake_cluster = true;
cluster->version = 1;
cluster->part_count = 1;
cluster->part_mask = 0;
cluster->part_map = palloc(cluster->part_count * sizeof(ProxyConnection *));
cluster->active_list = palloc(cluster->part_count * sizeof(ProxyConnection *));
- conn = palloc0(sizeof(ProxyConnection));
- conn->cluster = cluster;
- conn->connstr = pstrdup(cluster->name);
- conn->cur = palloc0(sizeof(ProxyConnectionState));
- conn->cur->state = C_NONE;
-
- aatree_insert(&cluster->conn_tree, (uintptr_t)conn->connstr, &conn->node);
- cluster->part_map[0] = conn;
-
MemoryContextSwitchTo(old_ctx);
+ add_connection(cluster, connect_str, 0);
+
aatree_insert(&fake_cluster_tree, (uintptr_t)connect_str, &cluster->node);
+done:
+ refresh_cluster(func, cluster);
return cluster;
}
if (!cluster)
{
cluster = new_cluster(name);
+ cluster->needs_reload = true;
aatree_insert(&cluster_tree, (uintptr_t)name, &cluster->node);
}
return cluster;
}
+/*
+ * Move connection to active list and init current
+ * connection state.
+ */
+void plproxy_activate_connection(struct ProxyConnection *conn)
+{
+ ProxyCluster *cluster = conn->cluster;
+ ConnUserInfo *userinfo = cluster->cur_userinfo;
+ const char *username = userinfo->username;
+ struct AANode *node;
+ ProxyConnectionState *cur;
+
+ /* move connection to active_list */
+ cluster->active_list[cluster->active_count] = conn;
+ cluster->active_count++;
+
+ /* fill ->cur pointer */
+
+ node = aatree_search(&conn->userstate_tree, (uintptr_t)username);
+ if (node) {
+ cur = (ProxyConnectionState *)node;
+ } else {
+ cur = MemoryContextAlloc(cluster_mem, sizeof(*cur));
+ cur->userinfo = userinfo;
+ aatree_insert(&conn->userstate_tree, (uintptr_t)username, &cur->node);
+ }
+ conn->cur = cur;
+}
+
/*
* Clean old connections and results from all clusters.
*/
-static void clean_conn(struct AANode *node, void *arg)
+struct MaintInfo {
+ struct ProxyConfig *cf;
+ struct timeval *now;
+};
+
+static void clean_state(struct AANode *node, void *arg)
{
- ProxyConnection *conn = (ProxyConnection *)node;
- ProxyConnectionState *cur;
- ProxyConfig *cf = &conn->cluster->config;
- struct timeval *now = arg;
+ ProxyConnectionState *cur = (ProxyConnectionState *)node;
+ struct MaintInfo *maint = arg;
+ ProxyConfig *cf = maint->cf;
+ struct timeval *now = maint->now;
time_t age;
bool drop;
- if (conn->res)
- {
- PQclear(conn->res);
- conn->res = NULL;
- }
-
- cur = conn->cur;
if (!cur->db)
return;
}
}
+static void clean_conn(struct AANode *node, void *arg)
+{
+ ProxyConnection *conn = (ProxyConnection *)node;
+ struct MaintInfo *maint = arg;
+
+ if (conn->res)
+ {
+ PQclear(conn->res);
+ conn->res = NULL;
+ }
+
+ aatree_walk(&conn->userstate_tree, AA_WALK_IN_ORDER, clean_state, maint);
+}
+
static void clean_cluster(struct AANode *n, void *arg)
{
ProxyCluster *cluster = (ProxyCluster *)n;
- struct timeval *now = arg;
+ struct MaintInfo maint;
- aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, clean_conn, now);
+ maint.cf = &cluster->config;
+ maint.now = arg;
+
+ aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, clean_conn, &maint);
}
void
aatree_walk(&cluster_tree, AA_WALK_IN_ORDER, clean_cluster, now);
aatree_walk(&fake_cluster_tree, AA_WALK_IN_ORDER, clean_cluster, now);
}
+