Enable pl/proxy clusters to be defined by SQL/MED facilities.
authorMartin Pihlak <martin.pihlak@gmail.com>
Tue, 5 Jan 2010 13:06:24 +0000 (15:06 +0200)
committerMarko Kreen <markokr@gmail.com>
Fri, 8 Jan 2010 15:24:58 +0000 (17:24 +0200)
Provide a plproxy foreign data wrapper and a function for
validating cluster definitions. It is still possible to define
clusters using configuration functions, however SQL/MED cluster
definition takes precedence if available.

Makefile
doc/config.txt
doc/todo.txt
doc/tutorial.txt
expected/plproxy_sqlmed.out [new file with mode: 0644]
plproxy_fdw.sql [new file with mode: 0644]
plproxy_lang.sql [moved from plproxy.sql.in with 100% similarity]
sql/plproxy_sqlmed.sql [new file with mode: 0644]
src/cluster.c
src/main.c
src/plproxy.h

index 13a647b3c14455d60f6e51acef38b6c5c1671144..3a9cc840e4285a00e1b9a7d3a2a1430437ebefe7 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -6,13 +6,17 @@ PLPROXY_VERSION = 2.1-cvs
 PQINC = $(shell pg_config --includedir)
 PQLIB = $(shell pg_config --libdir)
 
+# PostgreSQL version
+PGVER = $(shell pg_config --version | sed 's/PostgreSQL //')
+SQLMED = $(shell test $(PGVER) "<" "8.4" && echo "false" || echo "true")
+
 # module setup
 MODULE_big = plproxy
 SRCS = src/cluster.c src/execute.c src/function.c src/main.c \
        src/query.c src/result.c src/type.c src/poll_compat.c
 OBJS = src/scanner.o src/parser.tab.o $(SRCS:.c=.o)
 DATA_built = plproxy.sql
-EXTRA_CLEAN = src/scanner.[ch] src/parser.tab.[ch]
+EXTRA_CLEAN = src/scanner.[ch] src/parser.tab.[ch] plproxy.sql.in
 PG_CPPFLAGS = -I$(PQINC)
 SHLIB_LINK = -L$(PQLIB) -lpq
 
@@ -23,13 +27,23 @@ DIST_FILES = Makefile src/plproxy.h src/rowstamp.h src/scanner.l src/parser.y \
                         config/simple.config.sql src/poll_compat.h \
                         doc/Makefile doc/config.txt doc/overview.txt doc/faq.txt \
                         doc/syntax.txt doc/todo.txt doc/tutorial.txt \
-                        AUTHORS COPYRIGHT README plproxy.sql.in NEWS \
+                        AUTHORS COPYRIGHT README plproxy_lang.sql plproxy_fdw.sql NEWS \
                         debian/packages debian/changelog
 
 # regression testing setup
 REGRESS = plproxy_init plproxy_test plproxy_select plproxy_many \
          plproxy_errors plproxy_clustermap plproxy_dynamic_record \
          plproxy_encoding plproxy_split
+
+# SQL files
+PLPROXY_SQL = plproxy_lang.sql
+
+# SQL/MED available, add foreign data wrapper and regression tests
+ifeq ($(SQLMED), true)
+REGRESS += plproxy_sqlmed
+PLPROXY_SQL += plproxy_fdw.sql
+endif
+
 REGRESS_OPTS = --load-language=plpgsql
 
 # load PGXS makefile
@@ -54,6 +68,9 @@ src/parser.tab.c: src/parser.y
 src/scanner.c: src/scanner.l
        cd src; $(FLEX) -oscanner.c scanner.l
 
+plproxy.sql.in: $(PLPROXY_SQL)
+       cat $^ > $@
+
 # dependencies
 $(OBJS): src/plproxy.h src/rowstamp.h
 src/execute.o: src/poll_compat.h
index 9f9ba8af1836d564ff311d63b70671071c268b8b..41f1873dbb96e46ea0fd7ae8b6a3d9c0e2ff9388 100644 (file)
@@ -1,5 +1,5 @@
 
-= PL/Proxy Cluster Configuration API =
+= PL/Proxy Cluster Configuration =
 
 PL/Proxy can be used in either CONNECT mode or CLUSTER mode.
 
@@ -10,12 +10,15 @@ to a database it will proxy the request to.
 PL/Proxy can also be used in CLUSTER mode where it provides support for
 partitioning data across multiple databases based on a clustering function.
 
-When using PL/Proxy in CONNECT mode no configuration functions are required.
-However, using PL/Proxy in CLUSTER mode requires the following configuration
-functions to be defined.
+When using PL/Proxy in CONNECT mode no special configuration is required.
+However, using PL/Proxy in CLUSTER mode requires the cluster configuration
+to be defined, either by the cluster configuration API or SQL/MED.
 
+== Cluster configuration API ==
 
-== plproxy.get_cluster_version(cluster_name) ==
+The following plproxy schema functions are used to define the clusters:
+
+=== plproxy.get_cluster_version(cluster_name) ===
 
   plproxy.get_cluster_version(cluster_name text)
   returns integer
@@ -41,7 +44,7 @@ external source such as a configuration table.
 
 
 
-== plproxy.get_cluster_partitions(cluster_name) ==
+=== plproxy.get_cluster_partitions(cluster_name) ===
 
   plproxy.get_cluster_partitions(cluster_name text)
   returns setof text
@@ -93,7 +96,7 @@ An example function without the use of separate configuration tables:
   END;
   $$ LANGUAGE plpgsql;
 
-== plproxy.get_cluster_config(cluster) ==
+=== plproxy.get_cluster_config(cluster) ===
 
   plproxy.get_cluster_config(in cluster_name text,
                             out key text, out val text)
@@ -149,4 +152,57 @@ Example function without the use of separate tables for storing parameters.
   END;
   $$ LANGUAGE plpgsql;
 
+== SQL/MED cluster definitions ==
+
+Pl/Proxy can take advantage of SQL/MED connection info management available
+in PostgreSQL 8.4 and above. The benefits of using SQL/MED are simplified
+cluster definition management and slightly improved performance.
+
+Both SQL/MED defined clusters and configuration function based clusters can
+coexist in the same database. If a cluster is defined in both, SQL/MED takes
+precedence. If no SQL/MED cluster is found an attempt is made to fall back to
+configuration functions.
+
+=== Configuring SQL/MED clusters ===
+
+First we need to create a foreign data wrapper. Generally the FDW is a kind of
+driver that provides remote database access, data marshalling etc. In this
+case, it's main role is to provide a validation function that sanity checks
+your cluster definitions.
+
+Note: the validation function is known to be broken in PostgreSQL 8.4.2 and
+below.
+
+  CREATE FOREIGN DATA WRAPPER plproxy [ VALIDATOR plproxy_fdw_validator ];
+
+Next we need to define a CLUSTER, this is done by creating a SERVER that uses
+the plproxy FDW. The options to the SERVER are Pl/Proxy configuration settings
+and the list of cluster partitions.
+
+Note: USAGE access to the SERVER must be explicitly granted. Without this,
+users are unable to use the cluster.
+
+  CREATE SERVER a_cluster FOREIGN DATA WRAPPER plproxy
+       OPTIONS (
+               connection_lifetime '1800',
+               disable_binary '1',
+               p0 'dbname=part00 hostname=127.0.0.1',
+               p1 'dbname=part01 hostname=127.0.0.1',
+               p2 'dbname=part02 hostname=127.0.0.1',
+               p3 'dbname=part03 hostname=127.0.0.1'
+               );
+
+Finally we need to create a user mapping for the Pl/Proxy users. One might
+create individual mappings for specific users:
+
+  CREATE USER MAPPING FOR bob SERVER a_cluster
+       OPTIONS (user 'bob', password 'secret');
+
+or create a PUBLIC mapping for all users of the system:
+
+  CREATE USER MAPPING FOR public SERVER a_cluster
+       OPTIONS (user 'plproxy', password 'foo');
+
+Also it is possible to create both individual and PUBLIC mapping, in this case
+the individual mapping takes precedence.
 
index e8f9e5b07ca161c0ca33897e6d533d043ec12133..63b38b4a7ab47141f4a0c0c3020499cec5d5a55f 100644 (file)
@@ -9,8 +9,6 @@
 
  * Lazy value type cache, per-conn binary I/O decision.
 
- * named connactions (ala SQL-MED)
-
 == Just thoughts ==
 
  * Drop plproxy.get_cluster_config()...
index b2a6351fdd1474aa319d74487a515911e53bd25e..759447bd327915ccac8d9b607c8306dd08bd3d78 100644 (file)
@@ -60,6 +60,8 @@ data with PL/Proxy.
 == Create  configuration functions ==
 
 Using PL/Proxy for partitioning requires setting up some configuration functions.
+Alternatively, if you are running PostgreSQL 8.4 or above you can take advantage
+of the SQL/MED connection management facilities. See below.
 
 When a query needs to be forwarded to a remote database the function
 plproxy.get_cluster_partitions(cluster) is invoked by plproxy to get the
@@ -117,6 +119,39 @@ on what this function can do.
 The config section contains more information on all of these functions.
 
 
+== Configuring Pl/Proxy clusters with SQL/MED ==
+
+First we need a foreign data wrapper. This is mostly a placeholder, but can
+be extended with a validator function to verify the cluster definition.
+See http://www.postgresql.org/docs/8.4/static/sql-createforeigndatawrapper.html
+for additional details of how to manage the SQL/MED catalog.
+
+  CREATE FOREIGN DATA WRAPPER plproxy;
+
+Then the actual cluster with its configuration options and partitions:
+
+  CREATE SERVER usercluster FOREIGN DATA WRAPPER plproxy
+  OPTIONS (    connection_lifetime '1800',
+               p0 'dbname=part00 host=127.0.0.1',
+               p1 'dbname=part01 host=127.0.0.1' );
+
+We also need a user mapping that maps local PostgreSQL users to remote
+partitions. It is possible to create PUBLIC mapping that applies for
+all users in the local system:
+
+  CREATE USER MAPPING FOR PUBLIC SERVER usercluster;
+
+Or a private mapping that can only be used by specific users:
+
+  CREATE USER MAPPING FOR bob SERVER usercluster 
+       OPTIONS (user 'plproxy', password 'salakala');
+
+Finally we need to grant USAGE on the cluster to specific users:
+
+  GRANT USAGE ON SERVER usercluster TO bob;
+
+
+
 == Partitioned remote call ==
 
 Here we assume that the user table is spread over several databases based
diff --git a/expected/plproxy_sqlmed.out b/expected/plproxy_sqlmed.out
new file mode 100644 (file)
index 0000000..9d831e5
--- /dev/null
@@ -0,0 +1,85 @@
+drop foreign data wrapper if exists plproxy cascade;
+create foreign data wrapper plproxy;
+create server sqlmedcluster foreign data wrapper plproxy 
+    options (   partition_0 'dbname=test_part3 host=localhost',
+                partition_1 'dbname=test_part2 host=localhost',
+                partition_2 'dbname=test_part1 host=localhost',
+                partition_3 'dbname=test_part0 host=localhost');
+create or replace function sqlmed_test1() returns setof text as $$
+    cluster 'sqlmedcluster';
+    run on 0;
+    select 'plproxy: user=' || current_user || ' dbname=' || current_database();
+$$ language plproxy;
+drop user if exists test_user_bob;
+create user test_user_bob password 'secret';
+-- no user mapping
+set session authorization test_user_bob;
+select * from sqlmed_test1();
+ERROR:  user mapping not found for "test_user_bob"
+reset session authorization;
+-- add a public user mapping 
+create user mapping for public server sqlmedcluster
+    options (   user        'test_user_bob',
+                password    'secret1');
+-- no access to foreign server
+set session authorization test_user_bob;
+select * from sqlmed_test1();
+ERROR:  permission denied for foreign server sqlmedcluster
+reset session authorization;
+-- ok, access granted
+grant usage on foreign server sqlmedcluster to test_user_bob;
+set session authorization test_user_bob;
+select * from sqlmed_test1();
+                 sqlmed_test1                  
+-----------------------------------------------
+ plproxy: user=test_user_bob dbname=test_part3
+(1 row)
+
+reset session authorization;
+-- cluster definition validation
+-- partition numbers must be consecutive
+alter server sqlmedcluster options (drop partition_2);
+select * from sqlmed_test1();
+ERROR:  PL/Proxy function public.sqlmed_test1(0): partitions numbers must be consecutive
+-- invalid partition count
+alter server sqlmedcluster options 
+    (drop partition_3,
+     add  partition_2 'dbname=test_part1 host=localhost');
+select * from sqlmed_test1();
+ERROR:  PL/Proxy function public.sqlmed_test1(0): invalid partition count
+-- switching betweem SQL/MED and compat mode
+create or replace function sqlmed_compat_test() returns setof text as $$
+    cluster 'testcluster';
+    run on 0;
+    select 'plproxy: part=' || current_database();
+$$ language plproxy;
+-- testcluster
+select * from sqlmed_compat_test();
+    sqlmed_compat_test    
+--------------------------
+ plproxy: part=test_part0
+(1 row)
+
+-- override the test cluster with a SQL/MED definition
+drop server if exists testcluster cascade;
+NOTICE:  server "testcluster" does not exist, skipping
+create server testcluster foreign data wrapper plproxy 
+    options (partition_0 'dbname=regression host=localhost');
+create user mapping for public server testcluster;
+-- sqlmed testcluster
+select * from sqlmed_compat_test();
+    sqlmed_compat_test    
+--------------------------
+ plproxy: part=regression
+(1 row)
+
+-- now drop the SQL/MED testcluster, and test fallback
+drop server testcluster cascade;
+NOTICE:  drop cascades to user mapping for public
+-- back on testcluster again
+select * from sqlmed_compat_test();
+    sqlmed_compat_test    
+--------------------------
+ plproxy: part=test_part0
+(1 row)
+
diff --git a/plproxy_fdw.sql b/plproxy_fdw.sql
new file mode 100644 (file)
index 0000000..bf23f74
--- /dev/null
@@ -0,0 +1,7 @@
+-- validator function
+CREATE FUNCTION plproxy_fdw_validator (text[], oid)
+RETURNS boolean AS 'MODULE_PATHNAME' LANGUAGE C;
+
+-- foreign data wrapper
+CREATE FOREIGN DATA WRAPPER plproxy VALIDATOR plproxy_fdw_validator;
+
similarity index 100%
rename from plproxy.sql.in
rename to plproxy_lang.sql
diff --git a/sql/plproxy_sqlmed.sql b/sql/plproxy_sqlmed.sql
new file mode 100644 (file)
index 0000000..9ed64ac
--- /dev/null
@@ -0,0 +1,77 @@
+drop foreign data wrapper if exists plproxy cascade;
+create foreign data wrapper plproxy;
+
+create server sqlmedcluster foreign data wrapper plproxy 
+    options (   partition_0 'dbname=test_part3 host=localhost',
+                partition_1 'dbname=test_part2 host=localhost',
+                partition_2 'dbname=test_part1 host=localhost',
+                partition_3 'dbname=test_part0 host=localhost');
+
+create or replace function sqlmed_test1() returns setof text as $$
+    cluster 'sqlmedcluster';
+    run on 0;
+    select 'plproxy: user=' || current_user || ' dbname=' || current_database();
+$$ language plproxy;
+
+drop user if exists test_user_bob;
+create user test_user_bob password 'secret';
+
+-- no user mapping
+set session authorization test_user_bob;
+select * from sqlmed_test1();
+reset session authorization;
+
+-- add a public user mapping 
+create user mapping for public server sqlmedcluster
+    options (   user        'test_user_bob',
+                password    'secret1');
+
+-- no access to foreign server
+set session authorization test_user_bob;
+select * from sqlmed_test1();
+reset session authorization;
+
+-- ok, access granted
+grant usage on foreign server sqlmedcluster to test_user_bob;
+set session authorization test_user_bob;
+select * from sqlmed_test1();
+reset session authorization;
+
+-- cluster definition validation
+
+-- partition numbers must be consecutive
+alter server sqlmedcluster options (drop partition_2);
+select * from sqlmed_test1();
+
+-- invalid partition count
+alter server sqlmedcluster options 
+    (drop partition_3,
+     add  partition_2 'dbname=test_part1 host=localhost');
+select * from sqlmed_test1();
+
+-- switching betweem SQL/MED and compat mode
+
+create or replace function sqlmed_compat_test() returns setof text as $$
+    cluster 'testcluster';
+    run on 0;
+    select 'plproxy: part=' || current_database();
+$$ language plproxy;
+
+-- testcluster
+select * from sqlmed_compat_test();
+
+-- override the test cluster with a SQL/MED definition
+drop server if exists testcluster cascade;
+create server testcluster foreign data wrapper plproxy 
+    options (partition_0 'dbname=regression host=localhost');
+create user mapping for public server testcluster;
+
+-- sqlmed testcluster
+select * from sqlmed_compat_test();
+
+-- now drop the SQL/MED testcluster, and test fallback
+drop server testcluster cascade;
+
+-- back on testcluster again
+select * from sqlmed_compat_test();
+
index 3feb063de66cd6c0d0b14ec7ba723e5ea4a881a8..7a85f3657f1ee5129e7166e121c0b0aeadc628ee 100644 (file)
@@ -61,6 +61,21 @@ static const char part_sql[] = "select * from plproxy.get_cluster_partitions($1)
 /* query for fetching cluster config */
 static const char config_sql[] = "select * from plproxy.get_cluster_config($1)";
 
+#ifdef PLPROXY_USE_SQLMED
+
+/* list of all the valid configuration options to plproxy cluster */
+static const char *cluster_config_options[] = {
+       "statement_timeout",
+       "connection_lifetime",
+       "query_timeout",
+       "disable_binary",
+       NULL
+};
+
+extern Datum plproxy_fdw_validator(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(plproxy_fdw_validator);
+
+#endif
 
 /*
  * Connsetion count should be non-zero and power of 2.
@@ -161,10 +176,10 @@ free_connlist(ProxyCluster *cluster)
  * Add new database connection if it does not exists.
  */
 static ProxyConnection *
-add_connection(ProxyCluster *cluster, char *connstr)
+add_connection(ProxyCluster *cluster, char *connstr, int part_num)
 {
        int                     i;
-       ProxyConnection *conn;
+       ProxyConnection *conn = NULL;
        char       *username;
        StringInfo      final;
 
@@ -179,16 +194,22 @@ add_connection(ProxyCluster *cluster, char *connstr)
        }
 
        /* check if already have it */
-       for (i = 0; i < cluster->conn_count; i++)
+       for (i = 0; i < cluster->conn_count && !conn; i++)
        {
-               conn = &cluster->conn_list[i];
-               if (strcmp(conn->connstr, final->data) == 0)
-                       return conn;
+               ProxyConnection *c = &cluster->conn_list[i];
+
+               if (strcmp(c->connstr, final->data) == 0)
+                       conn = c;
        }
 
        /* add new connection */
-       conn = &cluster->conn_list[cluster->conn_count++];
-       conn->connstr = MemoryContextStrdup(cluster_mem, final->data);
+       if (!conn)
+       {
+               conn = &cluster->conn_list[cluster->conn_count++];
+               conn->connstr = MemoryContextStrdup(cluster_mem, final->data);
+       }
+
+       cluster->part_map[part_num] = conn;
 
        return conn;
 }
@@ -223,6 +244,22 @@ get_version(ProxyFunction *func, Datum dname)
        return DatumGetInt32(bin_val);
 }
 
+/* set a configuration option. */
+static void
+set_config_key(ProxyFunction *func, ProxyConfig *cf, const char *key, const char *val)
+{
+       if (pg_strcasecmp(key, "statement_timeout") == 0)
+               /* ignore */ ;
+       else if (pg_strcasecmp("connection_lifetime", key) == 0)
+               cf->connection_lifetime = atoi(val);
+       else if (pg_strcasecmp("query_timeout", key) == 0)
+               cf->query_timeout = atoi(val);
+       else if (pg_strcasecmp("disable_binary", key) == 0)
+               cf->disable_binary = atoi(val);
+       else
+               plproxy_error(func, "Unknown config param: %s", key);
+}
+
 /*
  * Fetch cluster configuration.
  */
@@ -234,7 +271,6 @@ get_config(ProxyCluster *cluster, Datum dname, ProxyFunction *func)
        TupleDesc       desc;
        const char *key,
                           *val;
-       ProxyConfig *cf = &cluster->config;
 
        /* run query */
        err = SPI_execute_plan(config_plan, &dname, NULL, false, 0);
@@ -263,30 +299,40 @@ get_config(ProxyCluster *cluster, Datum dname, ProxyFunction *func)
                if (val == NULL)
                        plproxy_error(func, "val must not be NULL");
 
-               if (pg_strcasecmp(key, "statement_timeout") == 0)
-                       /* ignore */ ;
-               else if (pg_strcasecmp("connection_lifetime", key) == 0)
-                       cf->connection_lifetime = atoi(val);
-               else if (pg_strcasecmp("query_timeout", key) == 0)
-                       cf->query_timeout = atoi(val);
-               else if (pg_strcasecmp("disable_binary", key) == 0)
-                       cf->disable_binary = atoi(val);
-               else
-                       plproxy_error(func, "Unknown config param: %s", key);
+               set_config_key(func, &cluster->config, key, val);
        }
 
        return 0;
 }
 
+/* allocate memory for cluster partitions */
+static void
+allocate_cluster_partitions(ProxyCluster *cluster, int nparts)
+{
+       MemoryContext old_ctx;
+
+       /* free old one */
+       if (cluster->conn_list)
+               free_connlist(cluster);
+
+       cluster->part_count = nparts;
+       cluster->part_mask = cluster->part_count - 1;
+
+       /* allocate lists */
+       old_ctx = MemoryContextSwitchTo(cluster_mem);
+
+       cluster->part_map = palloc0(nparts * sizeof(ProxyConnection *));
+       cluster->conn_list = palloc0(nparts * sizeof(ProxyConnection));
+       MemoryContextSwitchTo(old_ctx);
+}
+
 /* fetch list of parts */
 static int
 reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func)
 {
        int                     err,
                                i;
-       ProxyConnection *conn;
        char       *connstr;
-       MemoryContext old_ctx;
        TupleDesc       desc;
        HeapTuple       row;
 
@@ -304,18 +350,7 @@ reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func)
        if (SPI_gettypeid(desc, 1) != TEXTOID)
                plproxy_error(func, "partition column 1 must be text");
 
-       /* free old one */
-       if (cluster->conn_list)
-               free_connlist(cluster);
-
-       cluster->part_count = SPI_processed;
-       cluster->part_mask = cluster->part_count - 1;
-
-       /* allocate lists */
-       old_ctx = MemoryContextSwitchTo(cluster_mem);
-       cluster->part_map = palloc0(SPI_processed * sizeof(ProxyConnection *));
-       cluster->conn_list = palloc0(SPI_processed * sizeof(ProxyConnection));
-       MemoryContextSwitchTo(old_ctx);
+       allocate_cluster_partitions(cluster, SPI_processed);
 
        /* fill values */
        for (i = 0; i < SPI_processed; i++)
@@ -326,13 +361,373 @@ reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func)
                if (connstr == NULL)
                        plproxy_error(func, "connstr must not be NULL");
 
-               conn = add_connection(cluster, connstr);
-               cluster->part_map[i] = conn;
+               add_connection(cluster, connstr, i);
        }
 
        return 0;
 }
 
+#ifdef PLPROXY_USE_SQLMED
+
+/* extract a partition number from foreign server option */
+static bool
+extract_part_num(const char *partname, int *part_num)
+{
+       char *partition_tags[] = { "p", "partition_", NULL };
+       char **part_tag;
+       char *errptr;
+
+       for (part_tag = partition_tags; *part_tag; part_tag++)
+       {
+               if (strstr(partname, *part_tag) == partname)
+               {
+                       *part_num = (int) strtoul(partname + strlen(*part_tag), &errptr, 10);
+                       if (*errptr == '\0')
+                               return true;
+               }
+       }
+
+       return false;
+}
+
+/*
+ * Validate the generic option given to servers or user mappings defined with
+ * plproxy foreign data wrapper.  Raise an ERROR if the option or its value is
+ * considered invalid.
+ */
+Datum
+plproxy_fdw_validator(PG_FUNCTION_ARGS)
+{
+       List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
+       Oid                     catalog = PG_GETARG_OID(1);
+       ListCell   *cell;
+       int                     part_count = 0;
+
+       /* Pre 8.4.3 databases have broken validator interface, warn the user */
+       if (catalog == InvalidOid)
+       {
+               ereport(NOTICE,
+                               (errcode(ERRCODE_WARNING),
+                                errmsg("Pl/Proxy: foreign data wrapper validator disabled"),
+                                errhint("validator is usable starting from PostgreSQL version 8.4.3")));
+
+               PG_RETURN_BOOL(false);
+       }
+
+       foreach(cell, options_list)
+       {
+               DefElem    *def = lfirst(cell);
+               char       *arg = strVal(def->arg);
+               int                     part_num;
+
+               if (catalog == ForeignServerRelationId)
+               {
+                       if (extract_part_num(def->defname, &part_num))
+                       {
+                               /* partition definition */
+                               if (part_num != part_count)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                                        errmsg("Pl/Proxy: partitions must be numbered consecutively"),
+                                                        errhint("next valid partition number is %d", part_count)));
+                               ++part_count;
+                       }
+                       else
+                       {
+                               const char **opt;
+
+                               /* see that a valid config option is specified */
+                               for (opt = cluster_config_options; *opt; opt++)
+                               {
+                                       if (pg_strcasecmp(*opt, def->defname) == 0)
+                                               break;
+                               }
+
+                               if (*opt == NULL)
+                                       elog(ERROR, "Pl/Proxy: invalid server option: %s", def->defname);
+                               else if (strspn(arg, "0123456789") != strlen(arg))
+                                       elog(ERROR, "Pl/Proxy: only integer options are allowed: %s=%s",
+                                                def->defname, arg);
+                       }
+               }
+               else if (catalog == UserMappingRelationId)
+               {
+                       /* user mapping only accepts "user" and "password" */
+                       if (pg_strcasecmp(def->defname, "user") != 0 &&
+                               pg_strcasecmp(def->defname, "password") != 0)
+                       {
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("Pl/Proxy: invalid option to user mapping"),
+                                                errhint("valid options are \"user\" and \"password\"")));
+                       }
+               }
+               else if (catalog == ForeignDataWrapperRelationId)
+               {
+                        /* At the moment there are no options to the fdw itself */
+                        elog(WARNING, "Pl/Proxy: foreign data wrapper takes no options");
+               }
+       }
+
+       if (catalog == ForeignServerRelationId)
+       {
+               if (!check_valid_partcount(part_count))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("Pl/Proxy: invalid number of partitions"),
+                                        errhint("the number of partitions in a cluster must be power of 2 (attempted %d)", part_count)));
+       }
+
+       PG_RETURN_BOOL(true);
+}
+
+/*
+ * Reload the cluster configuration and partitions from SQL/MED catalogs.
+ */
+static void
+reload_sqlmed_cluster(ProxyFunction *func, ProxyCluster *cluster,
+                                         ForeignServer *foreign_server)
+{
+       UserMapping        *user_mapping;
+       ForeignDataWrapper *fdw;
+       HeapTuple                       tup;
+       AclResult                       aclresult;
+       StringInfo          user_options;
+       ListCell                   *cell;
+       Oid                                     userid;
+       int                                     part_count = 0;
+       int                                     part_num;
+
+
+       userid = GetSessionUserId();
+       user_mapping = GetUserMapping(userid, foreign_server->serverid);
+       fdw = GetForeignDataWrapper(foreign_server->fdwid);
+
+       /*
+        * Look up the server and user mapping TIDs for handling syscache invalidations.
+        */
+    tup = SearchSysCache(FOREIGNSERVEROID,
+                                                ObjectIdGetDatum(foreign_server->serverid),
+                                                0, 0, 0);
+
+    if (!HeapTupleIsValid(tup))
+        elog(ERROR, "cache lookup failed for foreign server %u", foreign_server->serverid);
+
+       cluster->clusterTupleId = tup->t_self;
+       ReleaseSysCache(tup);
+
+    tup = SearchSysCache(USERMAPPINGUSERSERVER,
+                                                ObjectIdGetDatum(user_mapping->userid),
+                                                ObjectIdGetDatum(foreign_server->serverid),
+                                                0, 0);
+
+    if (!HeapTupleIsValid(tup))
+       {
+               /* Specific mapping not found, try PUBLIC */
+               tup = SearchSysCache(USERMAPPINGUSERSERVER,
+                                                        ObjectIdGetDatum(InvalidOid),
+                                                        ObjectIdGetDatum(foreign_server->serverid),
+                                                        0, 0);
+               if (!HeapTupleIsValid(tup))
+                       elog(ERROR, "cache lookup failed for user mapping (%u,%u)",
+                               user_mapping->userid, foreign_server->serverid);
+       }
+
+       cluster->umTupleId = tup->t_self;
+       ReleaseSysCache(tup);
+
+       /*
+        * Check permissions, user must have usage on the server.
+        */
+       aclresult = pg_foreign_server_aclcheck(foreign_server->serverid, userid, ACL_USAGE);
+       if (aclresult != ACLCHECK_OK)
+               aclcheck_error(aclresult, ACL_KIND_FOREIGN_SERVER, foreign_server->servername);
+
+       /* Extract the common connect string elements from user mapping */
+       user_options = makeStringInfo();
+       foreach(cell, user_mapping->options)
+       {
+               DefElem    *def = lfirst(cell);
+
+               appendStringInfo(user_options, "%s='%s' ", def->defname, strVal(def->arg));
+       }
+
+       /*
+        * Collect the cluster configuration and partition definitions from foreign
+        * server options. At first pass just collect the cluster options and count
+        * the number of partitions.
+        */
+       foreach(cell, foreign_server->options)
+       {
+               DefElem    *def = lfirst(cell);
+
+               if (extract_part_num(def->defname, &part_num))
+               {
+                       if (part_num != part_count)
+                               plproxy_error(func, "partitions numbers must be consecutive");
+
+                       part_count++;
+               }
+               else
+                       set_config_key(func, &cluster->config, def->defname, strVal(def->arg));
+       }
+
+       if (!check_valid_partcount(part_count))
+               plproxy_error(func, "invalid partition count");
+
+       /*
+        * Now that the partition count is known, allocate the partitions and make
+        * a second pass over the options adding each connstr to cluster.
+        */
+       allocate_cluster_partitions(cluster, part_count);
+
+       foreach(cell, foreign_server->options)
+       {
+               DefElem    *def = lfirst(cell);
+               StringInfo  buf = makeStringInfo();
+
+               if (!extract_part_num(def->defname, &part_num))
+                       continue;
+
+               appendStringInfo(buf, "%s%s%s", strVal(def->arg),
+                               user_options->len ? " " : "",
+                               user_options->data);
+               add_connection(cluster, buf->data, part_num);
+       }
+}
+
+/*
+ * We have no SQL/MED definition for the cluster, determine if the
+ * cluster is defined using the compat functions. Raise an ERROR
+ * if the plproxy schema functions don't exist.
+ */
+static void
+determine_compat_mode(ProxyCluster *cluster)
+{
+       bool            have_compat = false;
+       HeapTuple       tup;
+
+       /*
+        * See that we have plproxy schema and all the necessary functions
+        */
+
+       tup = SearchSysCache(NAMESPACENAME, PointerGetDatum("plproxy"), 0, 0, 0);
+       if (HeapTupleIsValid(tup))
+       {
+               Oid             namespaceId = HeapTupleGetOid(tup);
+               Oid                     paramOids[] = { TEXTOID };
+               oidvector       *parameterTypes = buildoidvector(paramOids, 1);
+               const char      **funcname;
+
+               /* All of the functions required to run pl/proxy in compat mode */
+               static const char *compat_functions[] = {
+                       "get_cluster_version",
+                       "get_cluster_config",
+                       "get_cluster_partitions",
+                       NULL
+               };
+
+               for (funcname = compat_functions; *funcname; funcname++)
+               {
+                       if (!SearchSysCacheExists(PROCNAMEARGSNSP,
+                                                                         PointerGetDatum(*funcname),
+                                                                         PointerGetDatum(parameterTypes),
+                                                                         ObjectIdGetDatum(namespaceId),
+                                                                         0))
+                               break;
+               }
+
+               /* we have the schema and all the functions - use compat */
+               if (! *funcname)
+                       have_compat = true;
+
+               ReleaseSysCache(tup);
+       }
+
+       if (!have_compat)
+               elog(ERROR, "Pl/Proxy: cluster not found: %s", cluster->name);
+}
+
+/*
+ * Syscache inval callback function for foreign servers and user mappings.
+ *
+ * Note: this invalidates compat clusters on any foreign server change. This
+ * allows SQL/MED clusters to override those defined by plproxy schema
+ * functions.
+ */
+static void
+ClusterSyscacheCallback(Datum arg, int cacheid, ItemPointer tuplePtr)
+{
+       ProxyCluster *cluster;
+
+       for (cluster = cluster_list; cluster; cluster = cluster->next)
+       {
+               if (cluster->needs_reload)
+               {
+                       /* already invalidated */
+                       continue;
+               }
+               else if (!tuplePtr)
+               {
+                       /* invalidate all */
+                       cluster->needs_reload = true;
+               }
+               else if (!cluster->sqlmed_cluster)
+               {
+                       /* allow new SQL/MED servers to override compat definitions */
+                       cluster->needs_reload = (cacheid == FOREIGNSERVEROID);
+               }
+               else if (cacheid == USERMAPPINGOID)
+               {
+                       /* user mappings changed */
+                       cluster->needs_reload = ItemPointerEquals(tuplePtr, &cluster->umTupleId);
+               }
+               else if (cacheid == FOREIGNSERVEROID)
+               {
+                       /* server definitions changed */
+                       cluster->needs_reload = ItemPointerEquals(tuplePtr, &cluster->clusterTupleId);
+               }
+       }
+}
+
+#endif
+
+/*
+ * Register syscache invalidation callbacks for SQL/MED clusters.
+ */
+void
+plproxy_syscache_callback_init(void)
+{
+#ifdef PLPROXY_USE_SQLMED
+       CacheRegisterSyscacheCallback(FOREIGNSERVEROID, ClusterSyscacheCallback, (Datum) 0);
+       CacheRegisterSyscacheCallback(USERMAPPINGOID, ClusterSyscacheCallback, (Datum) 0);
+#endif
+}
+
+/*
+ * Reload the cluster configuration and partitions from plproxy.get_cluster*
+ * functions.
+ */
+static void
+reload_plproxy_cluster(ProxyFunction *func, ProxyCluster *cluster)
+{
+       Datum   dname = DirectFunctionCall1(textin, CStringGetDatum(cluster->name));
+       int             cur_version;
+
+       plproxy_cluster_plan_init();
+
+       /* fetch serial, also check if exists */
+       cur_version = get_version(func, dname);
+
+       /* update if needed */
+       if (cur_version != cluster->version || cluster->needs_reload)
+       {
+               reload_parts(cluster, dname, func);
+               get_config(cluster, dname, func);
+               cluster->version = cur_version;
+       }
+}
+
 /* allocate new cluster */
 static ProxyCluster *
 new_cluster(const char *name)
@@ -344,12 +739,46 @@ new_cluster(const char *name)
 
        cluster = palloc0(sizeof(*cluster));
        cluster->name = pstrdup(name);
+       cluster->needs_reload = true;
 
        MemoryContextSwitchTo(old_ctx);
 
        return cluster;
 }
 
+/*
+ * Refresh the cluster.
+ */
+static void
+refresh_cluster(ProxyFunction *func, ProxyCluster *cluster)
+{
+#ifdef PLPROXY_USE_SQLMED
+       if (cluster->needs_reload)
+       {
+               ForeignServer *server;
+
+               /*
+                * Determine if this is a SQL/MED server name or a pl/proxy compat cluster.
+                * Fallback to plproxy.get_cluster*() functions if a foreign server is not
+                * found.
+                */
+               server = GetForeignServerByName(cluster->name, true);
+               cluster->sqlmed_cluster = (server != NULL);
+
+               if (!cluster->sqlmed_cluster)
+                       determine_compat_mode(cluster);
+               else
+                       reload_sqlmed_cluster(func, cluster, server);
+       }
+#endif
+
+       /* Either no SQL/MED support or no such foreign server */
+       if (!cluster->sqlmed_cluster)
+               reload_plproxy_cluster(func, cluster);
+
+       cluster->needs_reload = false;
+}
+
 /*
  * Get cached or create new fake cluster.
  */
@@ -434,9 +863,7 @@ ProxyCluster *
 plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo)
 {
        ProxyCluster *cluster;
-       int                     cur_version;
        const char *name;
-       Datum           dname;
 
 
        /* functions used CONNECT with query */
@@ -451,20 +878,12 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo)
        if (func->connect_str)
                return fake_cluster(func, func->connect_str);
 
-       /* initialize plans on demand only */
-       plproxy_cluster_plan_init();
-
+       /* Cluster statement, either a lookup function or a name */
        if (func->cluster_sql)
                name = resolve_query(func, fcinfo, func->cluster_sql);
        else
                name = func->cluster_name;
 
-       /* create Datum for name */
-       dname = DirectFunctionCall1(textin, CStringGetDatum(name));
-
-       /* fetch serial, also check if exists */
-       cur_version = get_version(func, dname);
-
        /* search if cached */
        for (cluster = cluster_list; cluster; cluster = cluster->next)
        {
@@ -480,13 +899,8 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo)
                cluster_list = cluster;
        }
 
-       /* update if needed */
-       if (cur_version != cluster->version)
-       {
-               reload_parts(cluster, dname, func);
-               get_config(cluster, dname, func);
-               cluster->version = cur_version;
-       }
+       /* determine cluster type, reload parts if necessary */
+       refresh_cluster(func, cluster);
 
        return cluster;
 }
index 36441d96b88399a6c40552f022b2b90ecbf7df3e..473f3ac69fea9b5c29220bdc55c58042dbe9af36 100644 (file)
@@ -93,6 +93,7 @@ plproxy_startup_init(void)
 
        plproxy_function_cache_init();
        plproxy_cluster_cache_init();
+       plproxy_syscache_callback_init();
 
        initialized = true;
 }
index 8b8d20e628e0a0edd8a6aa83c51e0fbcec447adc..4abc503181f7679d5e755c0f6c3f3adc5aaa2caf 100644 (file)
 #include <fmgr.h>
 #include <executor/spi.h>
 
+#if PG_VERSION_NUM >= 80400
+#define PLPROXY_USE_SQLMED
+#include <foreign/foreign.h>
+#include <catalog/pg_foreign_data_wrapper.h>
+#include <catalog/pg_foreign_server.h>
+#include <catalog/pg_user_mapping.h>
+#endif
+
+#include <access/reloptions.h>
 #include <access/tupdesc.h>
 #include <catalog/pg_namespace.h>
 #include <catalog/pg_proc.h>
 #include <commands/trigger.h>
 #include <mb/pg_wchar.h>
 #include <miscadmin.h>
+#include <nodes/value.h>
+#include <utils/acl.h>
 #include <utils/array.h>
 #include <utils/builtins.h>
 #include <utils/hsearch.h>
+#include "utils/inval.h"
 #include <utils/lsyscache.h>
 #include <utils/memutils.h>
 #include <utils/syscache.h>
-
 #include "rowstamp.h"
 
 #include <libpq-fe.h>
@@ -165,7 +176,16 @@ typedef struct ProxyCluster
        int                     ret_cur_pos;    /* Result walking: index of current row */
        int                     ret_total;              /* Result walking: total rows left */
 
+       bool            sqlmed_cluster; /* True if the cluster is defined using SQL/MED */
+       bool            needs_reload;   /* True if the cluster partition list should be reloaded */
        bool            busy;                   /* True if the cluster is already involved in execution */
+
+       /*
+        * SQL/MED clusters: TIDs of the foreign server and user mapping catalog tuples.
+        * Used in to perform cluster invalidation in syscache callbacks.
+        */
+       ItemPointerData         clusterTupleId;
+       ItemPointerData         umTupleId;
 } ProxyCluster;
 
 /*
@@ -343,6 +363,7 @@ void                plproxy_free_composite(ProxyComposite *meta);
 
 /* cluster.c */
 void           plproxy_cluster_cache_init(void);
+void           plproxy_syscache_callback_init(void);
 ProxyCluster *plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo);
 void           plproxy_cluster_maint(struct timeval * now);