-= PL/Proxy Cluster Configuration API =
+= PL/Proxy Cluster Configuration =
PL/Proxy can be used in either CONNECT mode or CLUSTER mode.
PL/Proxy can also be used in CLUSTER mode where it provides support for
partitioning data across multiple databases based on a clustering function.
-When using PL/Proxy in CONNECT mode no configuration functions are required.
-However, using PL/Proxy in CLUSTER mode requires the following configuration
-functions to be defined.
+When using PL/Proxy in CONNECT mode no special configuration is required.
+However, using PL/Proxy in CLUSTER mode requires the cluster configuration
+to be defined, either by the cluster configuration API or SQL/MED.
+== Cluster configuration API ==
-== plproxy.get_cluster_version(cluster_name) ==
+The following plproxy schema functions are used to define the clusters:
+
+=== plproxy.get_cluster_version(cluster_name) ===
plproxy.get_cluster_version(cluster_name text)
returns integer
-== plproxy.get_cluster_partitions(cluster_name) ==
+=== plproxy.get_cluster_partitions(cluster_name) ===
plproxy.get_cluster_partitions(cluster_name text)
returns setof text
END;
$$ LANGUAGE plpgsql;
-== plproxy.get_cluster_config(cluster) ==
+=== plproxy.get_cluster_config(cluster) ===
plproxy.get_cluster_config(in cluster_name text,
out key text, out val text)
END;
$$ LANGUAGE plpgsql;
+== SQL/MED cluster definitions ==
+
+Pl/Proxy can take advantage of SQL/MED connection info management available
+in PostgreSQL 8.4 and above. The benefits of using SQL/MED are simplified
+cluster definition management and slightly improved performance.
+
+Both SQL/MED defined clusters and configuration function based clusters can
+coexist in the same database. If a cluster is defined in both, SQL/MED takes
+precedence. If no SQL/MED cluster is found an attempt is made to fall back to
+configuration functions.
+
+=== Configuring SQL/MED clusters ===
+
+First we need to create a foreign data wrapper. Generally the FDW is a kind of
+driver that provides remote database access, data marshalling etc. In this
+case, it's main role is to provide a validation function that sanity checks
+your cluster definitions.
+
+Note: the validation function is known to be broken in PostgreSQL 8.4.2 and
+below.
+
+ CREATE FOREIGN DATA WRAPPER plproxy [ VALIDATOR plproxy_fdw_validator ];
+
+Next we need to define a CLUSTER, this is done by creating a SERVER that uses
+the plproxy FDW. The options to the SERVER are Pl/Proxy configuration settings
+and the list of cluster partitions.
+
+Note: USAGE access to the SERVER must be explicitly granted. Without this,
+users are unable to use the cluster.
+
+ CREATE SERVER a_cluster FOREIGN DATA WRAPPER plproxy
+ OPTIONS (
+ connection_lifetime '1800',
+ disable_binary '1',
+ p0 'dbname=part00 hostname=127.0.0.1',
+ p1 'dbname=part01 hostname=127.0.0.1',
+ p2 'dbname=part02 hostname=127.0.0.1',
+ p3 'dbname=part03 hostname=127.0.0.1'
+ );
+
+Finally we need to create a user mapping for the Pl/Proxy users. One might
+create individual mappings for specific users:
+
+ CREATE USER MAPPING FOR bob SERVER a_cluster
+ OPTIONS (user 'bob', password 'secret');
+
+or create a PUBLIC mapping for all users of the system:
+
+ CREATE USER MAPPING FOR public SERVER a_cluster
+ OPTIONS (user 'plproxy', password 'foo');
+
+Also it is possible to create both individual and PUBLIC mapping, in this case
+the individual mapping takes precedence.
/* query for fetching cluster config */
static const char config_sql[] = "select * from plproxy.get_cluster_config($1)";
+#ifdef PLPROXY_USE_SQLMED
+
+/* list of all the valid configuration options to plproxy cluster */
+static const char *cluster_config_options[] = {
+ "statement_timeout",
+ "connection_lifetime",
+ "query_timeout",
+ "disable_binary",
+ NULL
+};
+
+extern Datum plproxy_fdw_validator(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(plproxy_fdw_validator);
+
+#endif
/*
* Connsetion count should be non-zero and power of 2.
* Add new database connection if it does not exists.
*/
static ProxyConnection *
-add_connection(ProxyCluster *cluster, char *connstr)
+add_connection(ProxyCluster *cluster, char *connstr, int part_num)
{
int i;
- ProxyConnection *conn;
+ ProxyConnection *conn = NULL;
char *username;
StringInfo final;
}
/* check if already have it */
- for (i = 0; i < cluster->conn_count; i++)
+ for (i = 0; i < cluster->conn_count && !conn; i++)
{
- conn = &cluster->conn_list[i];
- if (strcmp(conn->connstr, final->data) == 0)
- return conn;
+ ProxyConnection *c = &cluster->conn_list[i];
+
+ if (strcmp(c->connstr, final->data) == 0)
+ conn = c;
}
/* add new connection */
- conn = &cluster->conn_list[cluster->conn_count++];
- conn->connstr = MemoryContextStrdup(cluster_mem, final->data);
+ if (!conn)
+ {
+ conn = &cluster->conn_list[cluster->conn_count++];
+ conn->connstr = MemoryContextStrdup(cluster_mem, final->data);
+ }
+
+ cluster->part_map[part_num] = conn;
return conn;
}
return DatumGetInt32(bin_val);
}
+/* set a configuration option. */
+static void
+set_config_key(ProxyFunction *func, ProxyConfig *cf, const char *key, const char *val)
+{
+ if (pg_strcasecmp(key, "statement_timeout") == 0)
+ /* ignore */ ;
+ else if (pg_strcasecmp("connection_lifetime", key) == 0)
+ cf->connection_lifetime = atoi(val);
+ else if (pg_strcasecmp("query_timeout", key) == 0)
+ cf->query_timeout = atoi(val);
+ else if (pg_strcasecmp("disable_binary", key) == 0)
+ cf->disable_binary = atoi(val);
+ else
+ plproxy_error(func, "Unknown config param: %s", key);
+}
+
/*
* Fetch cluster configuration.
*/
TupleDesc desc;
const char *key,
*val;
- ProxyConfig *cf = &cluster->config;
/* run query */
err = SPI_execute_plan(config_plan, &dname, NULL, false, 0);
if (val == NULL)
plproxy_error(func, "val must not be NULL");
- if (pg_strcasecmp(key, "statement_timeout") == 0)
- /* ignore */ ;
- else if (pg_strcasecmp("connection_lifetime", key) == 0)
- cf->connection_lifetime = atoi(val);
- else if (pg_strcasecmp("query_timeout", key) == 0)
- cf->query_timeout = atoi(val);
- else if (pg_strcasecmp("disable_binary", key) == 0)
- cf->disable_binary = atoi(val);
- else
- plproxy_error(func, "Unknown config param: %s", key);
+ set_config_key(func, &cluster->config, key, val);
}
return 0;
}
+/* allocate memory for cluster partitions */
+static void
+allocate_cluster_partitions(ProxyCluster *cluster, int nparts)
+{
+ MemoryContext old_ctx;
+
+ /* free old one */
+ if (cluster->conn_list)
+ free_connlist(cluster);
+
+ cluster->part_count = nparts;
+ cluster->part_mask = cluster->part_count - 1;
+
+ /* allocate lists */
+ old_ctx = MemoryContextSwitchTo(cluster_mem);
+
+ cluster->part_map = palloc0(nparts * sizeof(ProxyConnection *));
+ cluster->conn_list = palloc0(nparts * sizeof(ProxyConnection));
+ MemoryContextSwitchTo(old_ctx);
+}
+
/* fetch list of parts */
static int
reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func)
{
int err,
i;
- ProxyConnection *conn;
char *connstr;
- MemoryContext old_ctx;
TupleDesc desc;
HeapTuple row;
if (SPI_gettypeid(desc, 1) != TEXTOID)
plproxy_error(func, "partition column 1 must be text");
- /* free old one */
- if (cluster->conn_list)
- free_connlist(cluster);
-
- cluster->part_count = SPI_processed;
- cluster->part_mask = cluster->part_count - 1;
-
- /* allocate lists */
- old_ctx = MemoryContextSwitchTo(cluster_mem);
- cluster->part_map = palloc0(SPI_processed * sizeof(ProxyConnection *));
- cluster->conn_list = palloc0(SPI_processed * sizeof(ProxyConnection));
- MemoryContextSwitchTo(old_ctx);
+ allocate_cluster_partitions(cluster, SPI_processed);
/* fill values */
for (i = 0; i < SPI_processed; i++)
if (connstr == NULL)
plproxy_error(func, "connstr must not be NULL");
- conn = add_connection(cluster, connstr);
- cluster->part_map[i] = conn;
+ add_connection(cluster, connstr, i);
}
return 0;
}
+#ifdef PLPROXY_USE_SQLMED
+
+/* extract a partition number from foreign server option */
+static bool
+extract_part_num(const char *partname, int *part_num)
+{
+ char *partition_tags[] = { "p", "partition_", NULL };
+ char **part_tag;
+ char *errptr;
+
+ for (part_tag = partition_tags; *part_tag; part_tag++)
+ {
+ if (strstr(partname, *part_tag) == partname)
+ {
+ *part_num = (int) strtoul(partname + strlen(*part_tag), &errptr, 10);
+ if (*errptr == '\0')
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/*
+ * Validate the generic option given to servers or user mappings defined with
+ * plproxy foreign data wrapper. Raise an ERROR if the option or its value is
+ * considered invalid.
+ */
+Datum
+plproxy_fdw_validator(PG_FUNCTION_ARGS)
+{
+ List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
+ Oid catalog = PG_GETARG_OID(1);
+ ListCell *cell;
+ int part_count = 0;
+
+ /* Pre 8.4.3 databases have broken validator interface, warn the user */
+ if (catalog == InvalidOid)
+ {
+ ereport(NOTICE,
+ (errcode(ERRCODE_WARNING),
+ errmsg("Pl/Proxy: foreign data wrapper validator disabled"),
+ errhint("validator is usable starting from PostgreSQL version 8.4.3")));
+
+ PG_RETURN_BOOL(false);
+ }
+
+ foreach(cell, options_list)
+ {
+ DefElem *def = lfirst(cell);
+ char *arg = strVal(def->arg);
+ int part_num;
+
+ if (catalog == ForeignServerRelationId)
+ {
+ if (extract_part_num(def->defname, &part_num))
+ {
+ /* partition definition */
+ if (part_num != part_count)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("Pl/Proxy: partitions must be numbered consecutively"),
+ errhint("next valid partition number is %d", part_count)));
+ ++part_count;
+ }
+ else
+ {
+ const char **opt;
+
+ /* see that a valid config option is specified */
+ for (opt = cluster_config_options; *opt; opt++)
+ {
+ if (pg_strcasecmp(*opt, def->defname) == 0)
+ break;
+ }
+
+ if (*opt == NULL)
+ elog(ERROR, "Pl/Proxy: invalid server option: %s", def->defname);
+ else if (strspn(arg, "0123456789") != strlen(arg))
+ elog(ERROR, "Pl/Proxy: only integer options are allowed: %s=%s",
+ def->defname, arg);
+ }
+ }
+ else if (catalog == UserMappingRelationId)
+ {
+ /* user mapping only accepts "user" and "password" */
+ if (pg_strcasecmp(def->defname, "user") != 0 &&
+ pg_strcasecmp(def->defname, "password") != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("Pl/Proxy: invalid option to user mapping"),
+ errhint("valid options are \"user\" and \"password\"")));
+ }
+ }
+ else if (catalog == ForeignDataWrapperRelationId)
+ {
+ /* At the moment there are no options to the fdw itself */
+ elog(WARNING, "Pl/Proxy: foreign data wrapper takes no options");
+ }
+ }
+
+ if (catalog == ForeignServerRelationId)
+ {
+ if (!check_valid_partcount(part_count))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("Pl/Proxy: invalid number of partitions"),
+ errhint("the number of partitions in a cluster must be power of 2 (attempted %d)", part_count)));
+ }
+
+ PG_RETURN_BOOL(true);
+}
+
+/*
+ * Reload the cluster configuration and partitions from SQL/MED catalogs.
+ */
+static void
+reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster,
+ ForeignServer *foreign_server)
+{
+ UserMapping *user_mapping;
+ ForeignDataWrapper *fdw;
+ HeapTuple tup;
+ AclResult aclresult;
+ StringInfo user_options;
+ ListCell *cell;
+ Oid userid;
+ int part_count = 0;
+ int part_num;
+
+
+ userid = GetSessionUserId();
+ user_mapping = GetUserMapping(userid, foreign_server->serverid);
+ fdw = GetForeignDataWrapper(foreign_server->fdwid);
+
+ /*
+ * Look up the server and user mapping TIDs for handling syscache invalidations.
+ */
+ tup = SearchSysCache(FOREIGNSERVEROID,
+ ObjectIdGetDatum(foreign_server->serverid),
+ 0, 0, 0);
+
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for foreign server %u", foreign_server->serverid);
+
+ cluster->clusterTupleId = tup->t_self;
+ ReleaseSysCache(tup);
+
+ tup = SearchSysCache(USERMAPPINGUSERSERVER,
+ ObjectIdGetDatum(user_mapping->userid),
+ ObjectIdGetDatum(foreign_server->serverid),
+ 0, 0);
+
+ if (!HeapTupleIsValid(tup))
+ {
+ /* Specific mapping not found, try PUBLIC */
+ tup = SearchSysCache(USERMAPPINGUSERSERVER,
+ ObjectIdGetDatum(InvalidOid),
+ ObjectIdGetDatum(foreign_server->serverid),
+ 0, 0);
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for user mapping (%u,%u)",
+ user_mapping->userid, foreign_server->serverid);
+ }
+
+ cluster->umTupleId = tup->t_self;
+ ReleaseSysCache(tup);
+
+ /*
+ * Check permissions, user must have usage on the server.
+ */
+ aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, userid, ACL_USAGE);
+ if (aclresult != ACLCHECK_OK)
+ aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
+
+ /* Extract the common connect string elements from user mapping */
+ user_options = makeStringInfo();
+ foreach(cell, user_mapping->options)
+ {
+ DefElem *def = lfirst(cell);
+
+ appendStringInfo(user_options, "%s='%s' ", def->defname, strVal(def->arg));
+ }
+
+ /*
+ * Collect the cluster configuration and partition definitions from foreign
+ * server options. At first pass just collect the cluster options and count
+ * the number of partitions.
+ */
+ foreach(cell, foreign_server->options)
+ {
+ DefElem *def = lfirst(cell);
+
+ if (extract_part_num(def->defname, &part_num))
+ {
+ if (part_num != part_count)
+ plproxy_error(func, "partitions numbers must be consecutive");
+
+ part_count++;
+ }
+ else
+ set_config_key(func, &cluster->config, def->defname, strVal(def->arg));
+ }
+
+ if (!check_valid_partcount(part_count))
+ plproxy_error(func, "invalid partition count");
+
+ /*
+ * Now that the partition count is known, allocate the partitions and make
+ * a second pass over the options adding each connstr to cluster.
+ */
+ allocate_cluster_partitions(cluster, part_count);
+
+ foreach(cell, foreign_server->options)
+ {
+ DefElem *def = lfirst(cell);
+ StringInfo buf = makeStringInfo();
+
+ if (!extract_part_num(def->defname, &part_num))
+ continue;
+
+ appendStringInfo(buf, "%s%s%s", strVal(def->arg),
+ user_options->len ? " " : "",
+ user_options->data);
+ add_connection(cluster, buf->data, part_num);
+ }
+}
+
+/*
+ * We have no SQL/MED definition for the cluster, determine if the
+ * cluster is defined using the compat functions. Raise an ERROR
+ * if the plproxy schema functions don't exist.
+ */
+static void
+determine_compat_mode(ProxyCluster *cluster)
+{
+ bool have_compat = false;
+ HeapTuple tup;
+
+ /*
+ * See that we have plproxy schema and all the necessary functions
+ */
+
+ tup = SearchSysCache(NAMESPACENAME, PointerGetDatum("plproxy"), 0, 0, 0);
+ if (HeapTupleIsValid(tup))
+ {
+ Oid namespaceId = HeapTupleGetOid(tup);
+ Oid paramOids[] = { TEXTOID };
+ oidvector *parameterTypes = buildoidvector(paramOids, 1);
+ const char **funcname;
+
+ /* All of the functions required to run pl/proxy in compat mode */
+ static const char *compat_functions[] = {
+ "get_cluster_version",
+ "get_cluster_config",
+ "get_cluster_partitions",
+ NULL
+ };
+
+ for (funcname = compat_functions; *funcname; funcname++)
+ {
+ if (!SearchSysCacheExists(PROCNAMEARGSNSP,
+ PointerGetDatum(*funcname),
+ PointerGetDatum(parameterTypes),
+ ObjectIdGetDatum(namespaceId),
+ 0))
+ break;
+ }
+
+ /* we have the schema and all the functions - use compat */
+ if (! *funcname)
+ have_compat = true;
+
+ ReleaseSysCache(tup);
+ }
+
+ if (!have_compat)
+ elog(ERROR, "Pl/Proxy: cluster not found: %s", cluster->name);
+}
+
+/*
+ * Syscache inval callback function for foreign servers and user mappings.
+ *
+ * Note: this invalidates compat clusters on any foreign server change. This
+ * allows SQL/MED clusters to override those defined by plproxy schema
+ * functions.
+ */
+static void
+ClusterSyscacheCallback(Datum arg, int cacheid, ItemPointer tuplePtr)
+{
+ ProxyCluster *cluster;
+
+ for (cluster = cluster_list; cluster; cluster = cluster->next)
+ {
+ if (cluster->needs_reload)
+ {
+ /* already invalidated */
+ continue;
+ }
+ else if (!tuplePtr)
+ {
+ /* invalidate all */
+ cluster->needs_reload = true;
+ }
+ else if (!cluster->sqlmed_cluster)
+ {
+ /* allow new SQL/MED servers to override compat definitions */
+ cluster->needs_reload = (cacheid == FOREIGNSERVEROID);
+ }
+ else if (cacheid == USERMAPPINGOID)
+ {
+ /* user mappings changed */
+ cluster->needs_reload = ItemPointerEquals(tuplePtr, &cluster->umTupleId);
+ }
+ else if (cacheid == FOREIGNSERVEROID)
+ {
+ /* server definitions changed */
+ cluster->needs_reload = ItemPointerEquals(tuplePtr, &cluster->clusterTupleId);
+ }
+ }
+}
+
+#endif
+
+/*
+ * Register syscache invalidation callbacks for SQL/MED clusters.
+ */
+void
+plproxy_syscache_callback_init(void)
+{
+#ifdef PLPROXY_USE_SQLMED
+ CacheRegisterSyscacheCallback(FOREIGNSERVEROID, ClusterSyscacheCallback, (Datum) 0);
+ CacheRegisterSyscacheCallback(USERMAPPINGOID, ClusterSyscacheCallback, (Datum) 0);
+#endif
+}
+
+/*
+ * Reload the cluster configuration and partitions from plproxy.get_cluster*
+ * functions.
+ */
+static void
+reload_plproxy_cluster(ProxyFunction *func, ProxyCluster *cluster)
+{
+ Datum dname = DirectFunctionCall1(textin, CStringGetDatum(cluster->name));
+ int cur_version;
+
+ plproxy_cluster_plan_init();
+
+ /* fetch serial, also check if exists */
+ cur_version = get_version(func, dname);
+
+ /* update if needed */
+ if (cur_version != cluster->version || cluster->needs_reload)
+ {
+ reload_parts(cluster, dname, func);
+ get_config(cluster, dname, func);
+ cluster->version = cur_version;
+ }
+}
+
/* allocate new cluster */
static ProxyCluster *
new_cluster(const char *name)
cluster = palloc0(sizeof(*cluster));
cluster->name = pstrdup(name);
+ cluster->needs_reload = true;
MemoryContextSwitchTo(old_ctx);
return cluster;
}
+/*
+ * Refresh the cluster.
+ */
+static void
+refresh_cluster(ProxyFunction *func, ProxyCluster *cluster)
+{
+#ifdef PLPROXY_USE_SQLMED
+ if (cluster->needs_reload)
+ {
+ ForeignServer *server;
+
+ /*
+ * Determine if this is a SQL/MED server name or a pl/proxy compat cluster.
+ * Fallback to plproxy.get_cluster*() functions if a foreign server is not
+ * found.
+ */
+ server = GetForeignServerByName(cluster->name, true);
+ cluster->sqlmed_cluster = (server != NULL);
+
+ if (!cluster->sqlmed_cluster)
+ determine_compat_mode(cluster);
+ else
+ reload_sqlmed_cluster(func, cluster, server);
+ }
+#endif
+
+ /* Either no SQL/MED support or no such foreign server */
+ if (!cluster->sqlmed_cluster)
+ reload_plproxy_cluster(func, cluster);
+
+ cluster->needs_reload = false;
+}
+
/*
* Get cached or create new fake cluster.
*/
plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo)
{
ProxyCluster *cluster;
- int cur_version;
const char *name;
- Datum dname;
/* functions used CONNECT with query */
if (func->connect_str)
return fake_cluster(func, func->connect_str);
- /* initialize plans on demand only */
- plproxy_cluster_plan_init();
-
+ /* Cluster statement, either a lookup function or a name */
if (func->cluster_sql)
name = resolve_query(func, fcinfo, func->cluster_sql);
else
name = func->cluster_name;
- /* create Datum for name */
- dname = DirectFunctionCall1(textin, CStringGetDatum(name));
-
- /* fetch serial, also check if exists */
- cur_version = get_version(func, dname);
-
/* search if cached */
for (cluster = cluster_list; cluster; cluster = cluster->next)
{
cluster_list = cluster;
}
- /* update if needed */
- if (cur_version != cluster->version)
- {
- reload_parts(cluster, dname, func);
- get_config(cluster, dname, func);
- cluster->version = cur_version;
- }
+ /* determine cluster type, reload parts if necessary */
+ refresh_cluster(func, cluster);
return cluster;
}