From 471eb17d9b905ab8649f55bc947b7029479c9496 Mon Sep 17 00:00:00 2001 From: Martin Pihlak Date: Tue, 5 Jan 2010 15:06:24 +0200 Subject: [PATCH] Enable pl/proxy clusters to be defined by SQL/MED facilities. Provide a plproxy foreign data wrapper and a function for validating cluster definitions. It is still possible to define clusters using configuration functions, however SQL/MED cluster definition takes precedence if available. --- Makefile | 21 +- doc/config.txt | 70 +++- doc/todo.txt | 2 - doc/tutorial.txt | 35 ++ expected/plproxy_sqlmed.out | 85 +++++ plproxy_fdw.sql | 7 + plproxy.sql.in => plproxy_lang.sql | 0 sql/plproxy_sqlmed.sql | 77 +++++ src/cluster.c | 520 ++++++++++++++++++++++++++--- src/main.c | 1 + src/plproxy.h | 23 +- 11 files changed, 776 insertions(+), 65 deletions(-) create mode 100644 expected/plproxy_sqlmed.out create mode 100644 plproxy_fdw.sql rename plproxy.sql.in => plproxy_lang.sql (100%) create mode 100644 sql/plproxy_sqlmed.sql diff --git a/Makefile b/Makefile index 13a647b..3a9cc84 100644 --- a/Makefile +++ b/Makefile @@ -6,13 +6,17 @@ PLPROXY_VERSION = 2.1-cvs PQINC = $(shell pg_config --includedir) PQLIB = $(shell pg_config --libdir) +# PostgreSQL version +PGVER = $(shell pg_config --version | sed 's/PostgreSQL //') +SQLMED = $(shell test $(PGVER) "<" "8.4" && echo "false" || echo "true") + # module setup MODULE_big = plproxy SRCS = src/cluster.c src/execute.c src/function.c src/main.c \ src/query.c src/result.c src/type.c src/poll_compat.c OBJS = src/scanner.o src/parser.tab.o $(SRCS:.c=.o) DATA_built = plproxy.sql -EXTRA_CLEAN = src/scanner.[ch] src/parser.tab.[ch] +EXTRA_CLEAN = src/scanner.[ch] src/parser.tab.[ch] plproxy.sql.in PG_CPPFLAGS = -I$(PQINC) SHLIB_LINK = -L$(PQLIB) -lpq @@ -23,13 +27,23 @@ DIST_FILES = Makefile src/plproxy.h src/rowstamp.h src/scanner.l src/parser.y \ config/simple.config.sql src/poll_compat.h \ doc/Makefile doc/config.txt doc/overview.txt doc/faq.txt \ doc/syntax.txt doc/todo.txt doc/tutorial.txt \ - AUTHORS COPYRIGHT README plproxy.sql.in NEWS \ + AUTHORS COPYRIGHT README plproxy_lang.sql plproxy_fdw.sql NEWS \ debian/packages debian/changelog # regression testing setup REGRESS = plproxy_init plproxy_test plproxy_select plproxy_many \ plproxy_errors plproxy_clustermap plproxy_dynamic_record \ plproxy_encoding plproxy_split + +# SQL files +PLPROXY_SQL = plproxy_lang.sql + +# SQL/MED available, add foreign data wrapper and regression tests +ifeq ($(SQLMED), true) +REGRESS += plproxy_sqlmed +PLPROXY_SQL += plproxy_fdw.sql +endif + REGRESS_OPTS = --load-language=plpgsql # load PGXS makefile @@ -54,6 +68,9 @@ src/parser.tab.c: src/parser.y src/scanner.c: src/scanner.l cd src; $(FLEX) -oscanner.c scanner.l +plproxy.sql.in: $(PLPROXY_SQL) + cat $^ > $@ + # dependencies $(OBJS): src/plproxy.h src/rowstamp.h src/execute.o: src/poll_compat.h diff --git a/doc/config.txt b/doc/config.txt index 9f9ba8a..41f1873 100644 --- a/doc/config.txt +++ b/doc/config.txt @@ -1,5 +1,5 @@ -= PL/Proxy Cluster Configuration API = += PL/Proxy Cluster Configuration = PL/Proxy can be used in either CONNECT mode or CLUSTER mode. @@ -10,12 +10,15 @@ to a database it will proxy the request to. PL/Proxy can also be used in CLUSTER mode where it provides support for partitioning data across multiple databases based on a clustering function. -When using PL/Proxy in CONNECT mode no configuration functions are required. -However, using PL/Proxy in CLUSTER mode requires the following configuration -functions to be defined. +When using PL/Proxy in CONNECT mode no special configuration is required. +However, using PL/Proxy in CLUSTER mode requires the cluster configuration +to be defined, either by the cluster configuration API or SQL/MED. +== Cluster configuration API == -== plproxy.get_cluster_version(cluster_name) == +The following plproxy schema functions are used to define the clusters: + +=== plproxy.get_cluster_version(cluster_name) === plproxy.get_cluster_version(cluster_name text) returns integer @@ -41,7 +44,7 @@ external source such as a configuration table. -== plproxy.get_cluster_partitions(cluster_name) == +=== plproxy.get_cluster_partitions(cluster_name) === plproxy.get_cluster_partitions(cluster_name text) returns setof text @@ -93,7 +96,7 @@ An example function without the use of separate configuration tables: END; $$ LANGUAGE plpgsql; -== plproxy.get_cluster_config(cluster) == +=== plproxy.get_cluster_config(cluster) === plproxy.get_cluster_config(in cluster_name text, out key text, out val text) @@ -149,4 +152,57 @@ Example function without the use of separate tables for storing parameters. END; $$ LANGUAGE plpgsql; +== SQL/MED cluster definitions == + +Pl/Proxy can take advantage of SQL/MED connection info management available +in PostgreSQL 8.4 and above. The benefits of using SQL/MED are simplified +cluster definition management and slightly improved performance. + +Both SQL/MED defined clusters and configuration function based clusters can +coexist in the same database. If a cluster is defined in both, SQL/MED takes +precedence. If no SQL/MED cluster is found an attempt is made to fall back to +configuration functions. + +=== Configuring SQL/MED clusters === + +First we need to create a foreign data wrapper. Generally the FDW is a kind of +driver that provides remote database access, data marshalling etc. In this +case, it's main role is to provide a validation function that sanity checks +your cluster definitions. + +Note: the validation function is known to be broken in PostgreSQL 8.4.2 and +below. + + CREATE FOREIGN DATA WRAPPER plproxy [ VALIDATOR plproxy_fdw_validator ]; + +Next we need to define a CLUSTER, this is done by creating a SERVER that uses +the plproxy FDW. The options to the SERVER are Pl/Proxy configuration settings +and the list of cluster partitions. + +Note: USAGE access to the SERVER must be explicitly granted. Without this, +users are unable to use the cluster. + + CREATE SERVER a_cluster FOREIGN DATA WRAPPER plproxy + OPTIONS ( + connection_lifetime '1800', + disable_binary '1', + p0 'dbname=part00 hostname=127.0.0.1', + p1 'dbname=part01 hostname=127.0.0.1', + p2 'dbname=part02 hostname=127.0.0.1', + p3 'dbname=part03 hostname=127.0.0.1' + ); + +Finally we need to create a user mapping for the Pl/Proxy users. One might +create individual mappings for specific users: + + CREATE USER MAPPING FOR bob SERVER a_cluster + OPTIONS (user 'bob', password 'secret'); + +or create a PUBLIC mapping for all users of the system: + + CREATE USER MAPPING FOR public SERVER a_cluster + OPTIONS (user 'plproxy', password 'foo'); + +Also it is possible to create both individual and PUBLIC mapping, in this case +the individual mapping takes precedence. diff --git a/doc/todo.txt b/doc/todo.txt index e8f9e5b..63b38b4 100644 --- a/doc/todo.txt +++ b/doc/todo.txt @@ -9,8 +9,6 @@ * Lazy value type cache, per-conn binary I/O decision. - * named connactions (ala SQL-MED) - == Just thoughts == * Drop plproxy.get_cluster_config()... diff --git a/doc/tutorial.txt b/doc/tutorial.txt index b2a6351..759447b 100644 --- a/doc/tutorial.txt +++ b/doc/tutorial.txt @@ -60,6 +60,8 @@ data with PL/Proxy. == Create configuration functions == Using PL/Proxy for partitioning requires setting up some configuration functions. +Alternatively, if you are running PostgreSQL 8.4 or above you can take advantage +of the SQL/MED connection management facilities. See below. When a query needs to be forwarded to a remote database the function plproxy.get_cluster_partitions(cluster) is invoked by plproxy to get the @@ -117,6 +119,39 @@ on what this function can do. The config section contains more information on all of these functions. +== Configuring Pl/Proxy clusters with SQL/MED == + +First we need a foreign data wrapper. This is mostly a placeholder, but can +be extended with a validator function to verify the cluster definition. +See http://www.postgresql.org/docs/8.4/static/sql-createforeigndatawrapper.html +for additional details of how to manage the SQL/MED catalog. + + CREATE FOREIGN DATA WRAPPER plproxy; + +Then the actual cluster with its configuration options and partitions: + + CREATE SERVER usercluster FOREIGN DATA WRAPPER plproxy + OPTIONS ( connection_lifetime '1800', + p0 'dbname=part00 host=127.0.0.1', + p1 'dbname=part01 host=127.0.0.1' ); + +We also need a user mapping that maps local PostgreSQL users to remote +partitions. It is possible to create PUBLIC mapping that applies for +all users in the local system: + + CREATE USER MAPPING FOR PUBLIC SERVER usercluster; + +Or a private mapping that can only be used by specific users: + + CREATE USER MAPPING FOR bob SERVER usercluster + OPTIONS (user 'plproxy', password 'salakala'); + +Finally we need to grant USAGE on the cluster to specific users: + + GRANT USAGE ON SERVER usercluster TO bob; + + + == Partitioned remote call == Here we assume that the user table is spread over several databases based diff --git a/expected/plproxy_sqlmed.out b/expected/plproxy_sqlmed.out new file mode 100644 index 0000000..9d831e5 --- /dev/null +++ b/expected/plproxy_sqlmed.out @@ -0,0 +1,85 @@ +drop foreign data wrapper if exists plproxy cascade; +create foreign data wrapper plproxy; +create server sqlmedcluster foreign data wrapper plproxy + options ( partition_0 'dbname=test_part3 host=localhost', + partition_1 'dbname=test_part2 host=localhost', + partition_2 'dbname=test_part1 host=localhost', + partition_3 'dbname=test_part0 host=localhost'); +create or replace function sqlmed_test1() returns setof text as $$ + cluster 'sqlmedcluster'; + run on 0; + select 'plproxy: user=' || current_user || ' dbname=' || current_database(); +$$ language plproxy; +drop user if exists test_user_bob; +create user test_user_bob password 'secret'; +-- no user mapping +set session authorization test_user_bob; +select * from sqlmed_test1(); +ERROR: user mapping not found for "test_user_bob" +reset session authorization; +-- add a public user mapping +create user mapping for public server sqlmedcluster + options ( user 'test_user_bob', + password 'secret1'); +-- no access to foreign server +set session authorization test_user_bob; +select * from sqlmed_test1(); +ERROR: permission denied for foreign server sqlmedcluster +reset session authorization; +-- ok, access granted +grant usage on foreign server sqlmedcluster to test_user_bob; +set session authorization test_user_bob; +select * from sqlmed_test1(); + sqlmed_test1 +----------------------------------------------- + plproxy: user=test_user_bob dbname=test_part3 +(1 row) + +reset session authorization; +-- cluster definition validation +-- partition numbers must be consecutive +alter server sqlmedcluster options (drop partition_2); +select * from sqlmed_test1(); +ERROR: PL/Proxy function public.sqlmed_test1(0): partitions numbers must be consecutive +-- invalid partition count +alter server sqlmedcluster options + (drop partition_3, + add partition_2 'dbname=test_part1 host=localhost'); +select * from sqlmed_test1(); +ERROR: PL/Proxy function public.sqlmed_test1(0): invalid partition count +-- switching betweem SQL/MED and compat mode +create or replace function sqlmed_compat_test() returns setof text as $$ + cluster 'testcluster'; + run on 0; + select 'plproxy: part=' || current_database(); +$$ language plproxy; +-- testcluster +select * from sqlmed_compat_test(); + sqlmed_compat_test +-------------------------- + plproxy: part=test_part0 +(1 row) + +-- override the test cluster with a SQL/MED definition +drop server if exists testcluster cascade; +NOTICE: server "testcluster" does not exist, skipping +create server testcluster foreign data wrapper plproxy + options (partition_0 'dbname=regression host=localhost'); +create user mapping for public server testcluster; +-- sqlmed testcluster +select * from sqlmed_compat_test(); + sqlmed_compat_test +-------------------------- + plproxy: part=regression +(1 row) + +-- now drop the SQL/MED testcluster, and test fallback +drop server testcluster cascade; +NOTICE: drop cascades to user mapping for public +-- back on testcluster again +select * from sqlmed_compat_test(); + sqlmed_compat_test +-------------------------- + plproxy: part=test_part0 +(1 row) + diff --git a/plproxy_fdw.sql b/plproxy_fdw.sql new file mode 100644 index 0000000..bf23f74 --- /dev/null +++ b/plproxy_fdw.sql @@ -0,0 +1,7 @@ +-- validator function +CREATE FUNCTION plproxy_fdw_validator (text[], oid) +RETURNS boolean AS 'MODULE_PATHNAME' LANGUAGE C; + +-- foreign data wrapper +CREATE FOREIGN DATA WRAPPER plproxy VALIDATOR plproxy_fdw_validator; + diff --git a/plproxy.sql.in b/plproxy_lang.sql similarity index 100% rename from plproxy.sql.in rename to plproxy_lang.sql diff --git a/sql/plproxy_sqlmed.sql b/sql/plproxy_sqlmed.sql new file mode 100644 index 0000000..9ed64ac --- /dev/null +++ b/sql/plproxy_sqlmed.sql @@ -0,0 +1,77 @@ +drop foreign data wrapper if exists plproxy cascade; +create foreign data wrapper plproxy; + +create server sqlmedcluster foreign data wrapper plproxy + options ( partition_0 'dbname=test_part3 host=localhost', + partition_1 'dbname=test_part2 host=localhost', + partition_2 'dbname=test_part1 host=localhost', + partition_3 'dbname=test_part0 host=localhost'); + +create or replace function sqlmed_test1() returns setof text as $$ + cluster 'sqlmedcluster'; + run on 0; + select 'plproxy: user=' || current_user || ' dbname=' || current_database(); +$$ language plproxy; + +drop user if exists test_user_bob; +create user test_user_bob password 'secret'; + +-- no user mapping +set session authorization test_user_bob; +select * from sqlmed_test1(); +reset session authorization; + +-- add a public user mapping +create user mapping for public server sqlmedcluster + options ( user 'test_user_bob', + password 'secret1'); + +-- no access to foreign server +set session authorization test_user_bob; +select * from sqlmed_test1(); +reset session authorization; + +-- ok, access granted +grant usage on foreign server sqlmedcluster to test_user_bob; +set session authorization test_user_bob; +select * from sqlmed_test1(); +reset session authorization; + +-- cluster definition validation + +-- partition numbers must be consecutive +alter server sqlmedcluster options (drop partition_2); +select * from sqlmed_test1(); + +-- invalid partition count +alter server sqlmedcluster options + (drop partition_3, + add partition_2 'dbname=test_part1 host=localhost'); +select * from sqlmed_test1(); + +-- switching betweem SQL/MED and compat mode + +create or replace function sqlmed_compat_test() returns setof text as $$ + cluster 'testcluster'; + run on 0; + select 'plproxy: part=' || current_database(); +$$ language plproxy; + +-- testcluster +select * from sqlmed_compat_test(); + +-- override the test cluster with a SQL/MED definition +drop server if exists testcluster cascade; +create server testcluster foreign data wrapper plproxy + options (partition_0 'dbname=regression host=localhost'); +create user mapping for public server testcluster; + +-- sqlmed testcluster +select * from sqlmed_compat_test(); + +-- now drop the SQL/MED testcluster, and test fallback +drop server testcluster cascade; + +-- back on testcluster again +select * from sqlmed_compat_test(); + diff --git a/src/cluster.c b/src/cluster.c index 3feb063..7a85f36 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -61,6 +61,21 @@ static const char part_sql[] = "select * from plproxy.get_cluster_partitions($1) /* query for fetching cluster config */ static const char config_sql[] = "select * from plproxy.get_cluster_config($1)"; +#ifdef PLPROXY_USE_SQLMED + +/* list of all the valid configuration options to plproxy cluster */ +static const char *cluster_config_options[] = { + "statement_timeout", + "connection_lifetime", + "query_timeout", + "disable_binary", + NULL +}; + +extern Datum plproxy_fdw_validator(PG_FUNCTION_ARGS); +PG_FUNCTION_INFO_V1(plproxy_fdw_validator); + +#endif /* * Connsetion count should be non-zero and power of 2. @@ -161,10 +176,10 @@ free_connlist(ProxyCluster *cluster) * Add new database connection if it does not exists. */ static ProxyConnection * -add_connection(ProxyCluster *cluster, char *connstr) +add_connection(ProxyCluster *cluster, char *connstr, int part_num) { int i; - ProxyConnection *conn; + ProxyConnection *conn = NULL; char *username; StringInfo final; @@ -179,16 +194,22 @@ add_connection(ProxyCluster *cluster, char *connstr) } /* check if already have it */ - for (i = 0; i < cluster->conn_count; i++) + for (i = 0; i < cluster->conn_count && !conn; i++) { - conn = &cluster->conn_list[i]; - if (strcmp(conn->connstr, final->data) == 0) - return conn; + ProxyConnection *c = &cluster->conn_list[i]; + + if (strcmp(c->connstr, final->data) == 0) + conn = c; } /* add new connection */ - conn = &cluster->conn_list[cluster->conn_count++]; - conn->connstr = MemoryContextStrdup(cluster_mem, final->data); + if (!conn) + { + conn = &cluster->conn_list[cluster->conn_count++]; + conn->connstr = MemoryContextStrdup(cluster_mem, final->data); + } + + cluster->part_map[part_num] = conn; return conn; } @@ -223,6 +244,22 @@ get_version(ProxyFunction *func, Datum dname) return DatumGetInt32(bin_val); } +/* set a configuration option. */ +static void +set_config_key(ProxyFunction *func, ProxyConfig *cf, const char *key, const char *val) +{ + if (pg_strcasecmp(key, "statement_timeout") == 0) + /* ignore */ ; + else if (pg_strcasecmp("connection_lifetime", key) == 0) + cf->connection_lifetime = atoi(val); + else if (pg_strcasecmp("query_timeout", key) == 0) + cf->query_timeout = atoi(val); + else if (pg_strcasecmp("disable_binary", key) == 0) + cf->disable_binary = atoi(val); + else + plproxy_error(func, "Unknown config param: %s", key); +} + /* * Fetch cluster configuration. */ @@ -234,7 +271,6 @@ get_config(ProxyCluster *cluster, Datum dname, ProxyFunction *func) TupleDesc desc; const char *key, *val; - ProxyConfig *cf = &cluster->config; /* run query */ err = SPI_execute_plan(config_plan, &dname, NULL, false, 0); @@ -263,30 +299,40 @@ get_config(ProxyCluster *cluster, Datum dname, ProxyFunction *func) if (val == NULL) plproxy_error(func, "val must not be NULL"); - if (pg_strcasecmp(key, "statement_timeout") == 0) - /* ignore */ ; - else if (pg_strcasecmp("connection_lifetime", key) == 0) - cf->connection_lifetime = atoi(val); - else if (pg_strcasecmp("query_timeout", key) == 0) - cf->query_timeout = atoi(val); - else if (pg_strcasecmp("disable_binary", key) == 0) - cf->disable_binary = atoi(val); - else - plproxy_error(func, "Unknown config param: %s", key); + set_config_key(func, &cluster->config, key, val); } return 0; } +/* allocate memory for cluster partitions */ +static void +allocate_cluster_partitions(ProxyCluster *cluster, int nparts) +{ + MemoryContext old_ctx; + + /* free old one */ + if (cluster->conn_list) + free_connlist(cluster); + + cluster->part_count = nparts; + cluster->part_mask = cluster->part_count - 1; + + /* allocate lists */ + old_ctx = MemoryContextSwitchTo(cluster_mem); + + cluster->part_map = palloc0(nparts * sizeof(ProxyConnection *)); + cluster->conn_list = palloc0(nparts * sizeof(ProxyConnection)); + MemoryContextSwitchTo(old_ctx); +} + /* fetch list of parts */ static int reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func) { int err, i; - ProxyConnection *conn; char *connstr; - MemoryContext old_ctx; TupleDesc desc; HeapTuple row; @@ -304,18 +350,7 @@ reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func) if (SPI_gettypeid(desc, 1) != TEXTOID) plproxy_error(func, "partition column 1 must be text"); - /* free old one */ - if (cluster->conn_list) - free_connlist(cluster); - - cluster->part_count = SPI_processed; - cluster->part_mask = cluster->part_count - 1; - - /* allocate lists */ - old_ctx = MemoryContextSwitchTo(cluster_mem); - cluster->part_map = palloc0(SPI_processed * sizeof(ProxyConnection *)); - cluster->conn_list = palloc0(SPI_processed * sizeof(ProxyConnection)); - MemoryContextSwitchTo(old_ctx); + allocate_cluster_partitions(cluster, SPI_processed); /* fill values */ for (i = 0; i < SPI_processed; i++) @@ -326,13 +361,373 @@ reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func) if (connstr == NULL) plproxy_error(func, "connstr must not be NULL"); - conn = add_connection(cluster, connstr); - cluster->part_map[i] = conn; + add_connection(cluster, connstr, i); } return 0; } +#ifdef PLPROXY_USE_SQLMED + +/* extract a partition number from foreign server option */ +static bool +extract_part_num(const char *partname, int *part_num) +{ + char *partition_tags[] = { "p", "partition_", NULL }; + char **part_tag; + char *errptr; + + for (part_tag = partition_tags; *part_tag; part_tag++) + { + if (strstr(partname, *part_tag) == partname) + { + *part_num = (int) strtoul(partname + strlen(*part_tag), &errptr, 10); + if (*errptr == '\0') + return true; + } + } + + return false; +} + +/* + * Validate the generic option given to servers or user mappings defined with + * plproxy foreign data wrapper. Raise an ERROR if the option or its value is + * considered invalid. + */ +Datum +plproxy_fdw_validator(PG_FUNCTION_ARGS) +{ + List *options_list = untransformRelOptions(PG_GETARG_DATUM(0)); + Oid catalog = PG_GETARG_OID(1); + ListCell *cell; + int part_count = 0; + + /* Pre 8.4.3 databases have broken validator interface, warn the user */ + if (catalog == InvalidOid) + { + ereport(NOTICE, + (errcode(ERRCODE_WARNING), + errmsg("Pl/Proxy: foreign data wrapper validator disabled"), + errhint("validator is usable starting from PostgreSQL version 8.4.3"))); + + PG_RETURN_BOOL(false); + } + + foreach(cell, options_list) + { + DefElem *def = lfirst(cell); + char *arg = strVal(def->arg); + int part_num; + + if (catalog == ForeignServerRelationId) + { + if (extract_part_num(def->defname, &part_num)) + { + /* partition definition */ + if (part_num != part_count) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Pl/Proxy: partitions must be numbered consecutively"), + errhint("next valid partition number is %d", part_count))); + ++part_count; + } + else + { + const char **opt; + + /* see that a valid config option is specified */ + for (opt = cluster_config_options; *opt; opt++) + { + if (pg_strcasecmp(*opt, def->defname) == 0) + break; + } + + if (*opt == NULL) + elog(ERROR, "Pl/Proxy: invalid server option: %s", def->defname); + else if (strspn(arg, "0123456789") != strlen(arg)) + elog(ERROR, "Pl/Proxy: only integer options are allowed: %s=%s", + def->defname, arg); + } + } + else if (catalog == UserMappingRelationId) + { + /* user mapping only accepts "user" and "password" */ + if (pg_strcasecmp(def->defname, "user") != 0 && + pg_strcasecmp(def->defname, "password") != 0) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Pl/Proxy: invalid option to user mapping"), + errhint("valid options are \"user\" and \"password\""))); + } + } + else if (catalog == ForeignDataWrapperRelationId) + { + /* At the moment there are no options to the fdw itself */ + elog(WARNING, "Pl/Proxy: foreign data wrapper takes no options"); + } + } + + if (catalog == ForeignServerRelationId) + { + if (!check_valid_partcount(part_count)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Pl/Proxy: invalid number of partitions"), + errhint("the number of partitions in a cluster must be power of 2 (attempted %d)", part_count))); + } + + PG_RETURN_BOOL(true); +} + +/* + * Reload the cluster configuration and partitions from SQL/MED catalogs. + */ +static void +reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster, + ForeignServer *foreign_server) +{ + UserMapping *user_mapping; + ForeignDataWrapper *fdw; + HeapTuple tup; + AclResult aclresult; + StringInfo user_options; + ListCell *cell; + Oid userid; + int part_count = 0; + int part_num; + + + userid = GetSessionUserId(); + user_mapping = GetUserMapping(userid, foreign_server->serverid); + fdw = GetForeignDataWrapper(foreign_server->fdwid); + + /* + * Look up the server and user mapping TIDs for handling syscache invalidations. + */ + tup = SearchSysCache(FOREIGNSERVEROID, + ObjectIdGetDatum(foreign_server->serverid), + 0, 0, 0); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for foreign server %u", foreign_server->serverid); + + cluster->clusterTupleId = tup->t_self; + ReleaseSysCache(tup); + + tup = SearchSysCache(USERMAPPINGUSERSERVER, + ObjectIdGetDatum(user_mapping->userid), + ObjectIdGetDatum(foreign_server->serverid), + 0, 0); + + if (!HeapTupleIsValid(tup)) + { + /* Specific mapping not found, try PUBLIC */ + tup = SearchSysCache(USERMAPPINGUSERSERVER, + ObjectIdGetDatum(InvalidOid), + ObjectIdGetDatum(foreign_server->serverid), + 0, 0); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for user mapping (%u,%u)", + user_mapping->userid, foreign_server->serverid); + } + + cluster->umTupleId = tup->t_self; + ReleaseSysCache(tup); + + /* + * Check permissions, user must have usage on the server. + */ + aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, userid, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername); + + /* Extract the common connect string elements from user mapping */ + user_options = makeStringInfo(); + foreach(cell, user_mapping->options) + { + DefElem *def = lfirst(cell); + + appendStringInfo(user_options, "%s='%s' ", def->defname, strVal(def->arg)); + } + + /* + * Collect the cluster configuration and partition definitions from foreign + * server options. At first pass just collect the cluster options and count + * the number of partitions. + */ + foreach(cell, foreign_server->options) + { + DefElem *def = lfirst(cell); + + if (extract_part_num(def->defname, &part_num)) + { + if (part_num != part_count) + plproxy_error(func, "partitions numbers must be consecutive"); + + part_count++; + } + else + set_config_key(func, &cluster->config, def->defname, strVal(def->arg)); + } + + if (!check_valid_partcount(part_count)) + plproxy_error(func, "invalid partition count"); + + /* + * Now that the partition count is known, allocate the partitions and make + * a second pass over the options adding each connstr to cluster. + */ + allocate_cluster_partitions(cluster, part_count); + + foreach(cell, foreign_server->options) + { + DefElem *def = lfirst(cell); + StringInfo buf = makeStringInfo(); + + if (!extract_part_num(def->defname, &part_num)) + continue; + + appendStringInfo(buf, "%s%s%s", strVal(def->arg), + user_options->len ? " " : "", + user_options->data); + add_connection(cluster, buf->data, part_num); + } +} + +/* + * We have no SQL/MED definition for the cluster, determine if the + * cluster is defined using the compat functions. Raise an ERROR + * if the plproxy schema functions don't exist. + */ +static void +determine_compat_mode(ProxyCluster *cluster) +{ + bool have_compat = false; + HeapTuple tup; + + /* + * See that we have plproxy schema and all the necessary functions + */ + + tup = SearchSysCache(NAMESPACENAME, PointerGetDatum("plproxy"), 0, 0, 0); + if (HeapTupleIsValid(tup)) + { + Oid namespaceId = HeapTupleGetOid(tup); + Oid paramOids[] = { TEXTOID }; + oidvector *parameterTypes = buildoidvector(paramOids, 1); + const char **funcname; + + /* All of the functions required to run pl/proxy in compat mode */ + static const char *compat_functions[] = { + "get_cluster_version", + "get_cluster_config", + "get_cluster_partitions", + NULL + }; + + for (funcname = compat_functions; *funcname; funcname++) + { + if (!SearchSysCacheExists(PROCNAMEARGSNSP, + PointerGetDatum(*funcname), + PointerGetDatum(parameterTypes), + ObjectIdGetDatum(namespaceId), + 0)) + break; + } + + /* we have the schema and all the functions - use compat */ + if (! *funcname) + have_compat = true; + + ReleaseSysCache(tup); + } + + if (!have_compat) + elog(ERROR, "Pl/Proxy: cluster not found: %s", cluster->name); +} + +/* + * Syscache inval callback function for foreign servers and user mappings. + * + * Note: this invalidates compat clusters on any foreign server change. This + * allows SQL/MED clusters to override those defined by plproxy schema + * functions. + */ +static void +ClusterSyscacheCallback(Datum arg, int cacheid, ItemPointer tuplePtr) +{ + ProxyCluster *cluster; + + for (cluster = cluster_list; cluster; cluster = cluster->next) + { + if (cluster->needs_reload) + { + /* already invalidated */ + continue; + } + else if (!tuplePtr) + { + /* invalidate all */ + cluster->needs_reload = true; + } + else if (!cluster->sqlmed_cluster) + { + /* allow new SQL/MED servers to override compat definitions */ + cluster->needs_reload = (cacheid == FOREIGNSERVEROID); + } + else if (cacheid == USERMAPPINGOID) + { + /* user mappings changed */ + cluster->needs_reload = ItemPointerEquals(tuplePtr, &cluster->umTupleId); + } + else if (cacheid == FOREIGNSERVEROID) + { + /* server definitions changed */ + cluster->needs_reload = ItemPointerEquals(tuplePtr, &cluster->clusterTupleId); + } + } +} + +#endif + +/* + * Register syscache invalidation callbacks for SQL/MED clusters. + */ +void +plproxy_syscache_callback_init(void) +{ +#ifdef PLPROXY_USE_SQLMED + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, ClusterSyscacheCallback, (Datum) 0); + CacheRegisterSyscacheCallback(USERMAPPINGOID, ClusterSyscacheCallback, (Datum) 0); +#endif +} + +/* + * Reload the cluster configuration and partitions from plproxy.get_cluster* + * functions. + */ +static void +reload_plproxy_cluster(ProxyFunction *func, ProxyCluster *cluster) +{ + Datum dname = DirectFunctionCall1(textin, CStringGetDatum(cluster->name)); + int cur_version; + + plproxy_cluster_plan_init(); + + /* fetch serial, also check if exists */ + cur_version = get_version(func, dname); + + /* update if needed */ + if (cur_version != cluster->version || cluster->needs_reload) + { + reload_parts(cluster, dname, func); + get_config(cluster, dname, func); + cluster->version = cur_version; + } +} + /* allocate new cluster */ static ProxyCluster * new_cluster(const char *name) @@ -344,12 +739,46 @@ new_cluster(const char *name) cluster = palloc0(sizeof(*cluster)); cluster->name = pstrdup(name); + cluster->needs_reload = true; MemoryContextSwitchTo(old_ctx); return cluster; } +/* + * Refresh the cluster. + */ +static void +refresh_cluster(ProxyFunction *func, ProxyCluster *cluster) +{ +#ifdef PLPROXY_USE_SQLMED + if (cluster->needs_reload) + { + ForeignServer *server; + + /* + * Determine if this is a SQL/MED server name or a pl/proxy compat cluster. + * Fallback to plproxy.get_cluster*() functions if a foreign server is not + * found. + */ + server = GetForeignServerByName(cluster->name, true); + cluster->sqlmed_cluster = (server != NULL); + + if (!cluster->sqlmed_cluster) + determine_compat_mode(cluster); + else + reload_sqlmed_cluster(func, cluster, server); + } +#endif + + /* Either no SQL/MED support or no such foreign server */ + if (!cluster->sqlmed_cluster) + reload_plproxy_cluster(func, cluster); + + cluster->needs_reload = false; +} + /* * Get cached or create new fake cluster. */ @@ -434,9 +863,7 @@ ProxyCluster * plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo) { ProxyCluster *cluster; - int cur_version; const char *name; - Datum dname; /* functions used CONNECT with query */ @@ -451,20 +878,12 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo) if (func->connect_str) return fake_cluster(func, func->connect_str); - /* initialize plans on demand only */ - plproxy_cluster_plan_init(); - + /* Cluster statement, either a lookup function or a name */ if (func->cluster_sql) name = resolve_query(func, fcinfo, func->cluster_sql); else name = func->cluster_name; - /* create Datum for name */ - dname = DirectFunctionCall1(textin, CStringGetDatum(name)); - - /* fetch serial, also check if exists */ - cur_version = get_version(func, dname); - /* search if cached */ for (cluster = cluster_list; cluster; cluster = cluster->next) { @@ -480,13 +899,8 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo) cluster_list = cluster; } - /* update if needed */ - if (cur_version != cluster->version) - { - reload_parts(cluster, dname, func); - get_config(cluster, dname, func); - cluster->version = cur_version; - } + /* determine cluster type, reload parts if necessary */ + refresh_cluster(func, cluster); return cluster; } diff --git a/src/main.c b/src/main.c index 36441d9..473f3ac 100644 --- a/src/main.c +++ b/src/main.c @@ -93,6 +93,7 @@ plproxy_startup_init(void) plproxy_function_cache_init(); plproxy_cluster_cache_init(); + plproxy_syscache_callback_init(); initialized = true; } diff --git a/src/plproxy.h b/src/plproxy.h index 8b8d20e..4abc503 100644 --- a/src/plproxy.h +++ b/src/plproxy.h @@ -29,6 +29,15 @@ #include #include +#if PG_VERSION_NUM >= 80400 +#define PLPROXY_USE_SQLMED +#include +#include +#include +#include +#endif + +#include #include #include #include @@ -36,13 +45,15 @@ #include #include #include +#include +#include #include #include #include +#include "utils/inval.h" #include #include #include - #include "rowstamp.h" #include @@ -165,7 +176,16 @@ typedef struct ProxyCluster int ret_cur_pos; /* Result walking: index of current row */ int ret_total; /* Result walking: total rows left */ + bool sqlmed_cluster; /* True if the cluster is defined using SQL/MED */ + bool needs_reload; /* True if the cluster partition list should be reloaded */ bool busy; /* True if the cluster is already involved in execution */ + + /* + * SQL/MED clusters: TIDs of the foreign server and user mapping catalog tuples. + * Used in to perform cluster invalidation in syscache callbacks. + */ + ItemPointerData clusterTupleId; + ItemPointerData umTupleId; } ProxyCluster; /* @@ -343,6 +363,7 @@ void plproxy_free_composite(ProxyComposite *meta); /* cluster.c */ void plproxy_cluster_cache_init(void); +void plproxy_syscache_callback_init(void); ProxyCluster *plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo); void plproxy_cluster_maint(struct timeval * now); -- 2.39.5