New SPLIT statement.
authorMarko Kreen <markokr@gmail.com>
Tue, 10 Nov 2009 13:19:27 +0000 (13:19 +0000)
committerMarko Kreen <markokr@gmail.com>
Tue, 10 Nov 2009 13:19:27 +0000 (13:19 +0000)
It will split incoming array(s) into per-partition arrays.

Roughly based on design here:

  http://lists.pgfoundry.org/pipermail/plproxy-users/2008-June/000093.html

Written by Martin Pihlak

14 files changed:
Makefile
doc/syntax.txt
doc/todo.txt
expected/plproxy_split.out [new file with mode: 0644]
sql/plproxy_split.sql [new file with mode: 0644]
src/cluster.c
src/execute.c
src/function.c
src/main.c
src/parser.y
src/plproxy.h
src/query.c
src/scanner.l
src/type.c

index 4adcce3cf068e6ea2266bf1a9d5bdca7f4793b50..c1ac40f284a773d821ec5ac398b6225c2ef3482a 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -29,7 +29,7 @@ DIST_FILES = Makefile src/plproxy.h src/rowstamp.h src/scanner.l src/parser.y \
 # regression testing setup
 REGRESS = plproxy_init plproxy_test plproxy_select plproxy_many \
          plproxy_errors plproxy_clustermap plproxy_dynamic_record \
-         plproxy_encoding
+         plproxy_encoding plproxy_split
 REGRESS_OPTS = --load-language=plpgsql
 
 # load PGXS makefile
index 2fc120a402486623392a983872fa975dc99fd80b..72de3c63d3eb0e672a235a915e8e7215b1d9c71f 100644 (file)
@@ -71,6 +71,62 @@ tagged, query will be sent in parallel to them.
 
 Take hash value directly from function argument.
 
+
+== SPLIT ==
+
+  SPLIT array_arg_1 [ , array_arg_2 ... ] ;
+
+Split the input arrays based on RUN ON statement into per-partition arrays.
+This is done by evaluating RUN ON condition for each array element and building
+per-partition parameter arrays for each matching partition. During execution
+each tagged partition then gets its own subset of the array to process.
+
+The semantics of RUN ON statement is slightly changed with SPLIT arrays:
+
+  RUN ON partition_func(..);
+
+The array is split between the partitions matching `partition_func()`. Any
+SPLIT parameters passed to the function are actually replaced with the
+individual array elements.
+
+  RUN ON argname;  RUN ON $1;
+
+An array of partition numbers (or hashes) can be passed as `argname`. The function
+shall be run on the partitions specified in the array.
+
+  RUN ON ANY;
+
+Each element is assigned to random partition.
+
+  RUN ON ALL;
+  RUN ON <NR>;
+
+Unaffected, except for the added overhead of array copying.
+
+Example:
+
+  CREATE FUNCTION set_profiles(i_users text[], i_profiles text[])
+  RETURNS SETOF text AS $$
+    CLUSTER 'userdb';
+    SPLIT i_users, i_profiles;
+    RUN ON hashtext(i_users);
+  $$ LANGUAGE plproxy;
+
+Given query:
+
+  SELECT * FROM set_profiles(ARRAY['foo', 'bar'], ARRAY['a', 'b']);
+
+The hash function is called 2 times:
+
+  SELECT * FROM hashtext('foo');
+  SELECT * FROM hashtext('bar');
+
+And target partitions get queries:
+
+  SELECT * FROM set_profiles(ARRAY['foo'], ARRAY['a']);
+  SELECT * FROM set_profiles(ARRAY['bar'], ARRAY['b']);
+
+
 == SELECT ==
 
   SELECT .... ;
index 0f1bad5ed151c93e6d1a98ca9c08be6d42395413..16ed61e1c5e2886933d2c95c1fbaceaca03e6157 100644 (file)
@@ -13,7 +13,6 @@
 
 == Just thoughts ==
 
- * SPREAD BY clause for OLAP loads.
  * Drop plproxy.get_cluster_config()...
 
  * Dynamic connstr loading for CONNECT functions?
diff --git a/expected/plproxy_split.out b/expected/plproxy_split.out
new file mode 100644 (file)
index 0000000..4001b86
--- /dev/null
@@ -0,0 +1,125 @@
+-- partition functions
+\c test_part0
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+                          || ' $2:' || array_to_string($2, ',')
+                          || ' $3:' || $3;
+$$ language sql;
+\c test_part1
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+                          || ' $2:' || array_to_string($2, ',')
+                          || ' $3:' || $3;
+$$ language sql;
+\c test_part2
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+                          || ' $2:' || array_to_string($2, ',')
+                          || ' $3:' || $3;
+$$ language sql;
+\c test_part3
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+                          || ' $2:' || array_to_string($2, ',')
+                          || ' $3:' || $3;
+$$ language sql;
+\c regression
+-- invalid arg reference
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split $4; cluster 'testcluster'; run on 0;$$ language plproxy;
+select * from test_array(array['a'], array['g'], 'foo');
+ERROR:  PL/Proxy function public.test_array(3): Compile error at line 1: invalid argument reference: $4
+-- invalid arg name
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split x; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+ERROR:  PL/Proxy function public.test_array(3): Compile error at line 1: invalid argument reference: x
+-- cannot split more than once
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b, b; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+ERROR:  PL/Proxy function public.test_array(3): SPLIT parameter specified more than once: b
+-- attempt to split non-array
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split $3; cluster 'testcluster'; run on 0;$$ language plproxy;
+select * from test_array(array['a'], array['g'], 'foo');
+ERROR:  PL/Proxy function public.test_array(3): SPLIT parameter is not an array: $3
+-- array size/dimensions mismatch
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+ERROR:  PL/Proxy function public.test_array(3): split arrays must be of identical lengths
+select * from test_array(array['a','b','c','d'], null, 'foo');
+ERROR:  PL/Proxy function public.test_array(3): split arrays must be of identical lengths
+select * from test_array(null, array['e','f','g','h'], 'foo');
+ERROR:  PL/Proxy function public.test_array(3): split arrays must be of identical lengths
+select * from test_array(array[array['a1'],array['a2']], array[array['b1'],array['b2']], 'foo');
+ERROR:  PL/Proxy function public.test_array(3): split multi-dimensional arrays are not supported
+-- run on array hash, split one array
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a; cluster 'testcluster'; run on ascii(a);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+            test_array             
+-----------------------------------
+ test_part0 $1:d $2:e,f,g,h $3:foo
+ test_part1 $1:a $2:e,f,g,h $3:foo
+ test_part2 $1:b $2:e,f,g,h $3:foo
+ test_part3 $1:c $2:e,f,g,h $3:foo
+(4 rows)
+
+-- run on text hash, split two arrays (nop split)
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on ascii(c);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+               test_array                
+-----------------------------------------
+ test_part2 $1:a,b,c,d $2:e,f,g,h $3:foo
+(1 row)
+
+-- run on array hash, split two arrays
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on ascii(a);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+         test_array          
+-----------------------------
+ test_part0 $1:d $2:h $3:foo
+ test_part1 $1:a $2:e $3:foo
+ test_part2 $1:b $2:f $3:foo
+ test_part3 $1:c $2:g $3:foo
+(4 rows)
+
+select * from test_array(null, null, null);
+ test_array 
+------------
+(0 rows)
+
+select * from test_array('{}'::text[], '{}'::text[], 'foo');
+ test_array 
+------------
+(0 rows)
+
+-- run on arg
+create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as
+$$ split a; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy;
+select * from test_array_direct(array[2,3], array['a','b','c','d'], 'foo');
+        test_array_direct         
+----------------------------------
+ test_part2 $1: $2:a,b,c,d $3:foo
+ test_part3 $1: $2:a,b,c,d $3:foo
+(2 rows)
+
+create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy;
+select * from test_array_direct(array[0,1,2,3], array['a','b','c','d'], 'foo');
+     test_array_direct      
+----------------------------
+ test_part0 $1: $2:a $3:foo
+ test_part1 $1: $2:b $3:foo
+ test_part2 $1: $2:c $3:foo
+ test_part3 $1: $2:d $3:foo
+(4 rows)
+
diff --git a/sql/plproxy_split.sql b/sql/plproxy_split.sql
new file mode 100644 (file)
index 0000000..d17d723
--- /dev/null
@@ -0,0 +1,87 @@
+-- partition functions
+\c test_part0
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+                          || ' $2:' || array_to_string($2, ',')
+                          || ' $3:' || $3;
+$$ language sql;
+\c test_part1
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+                          || ' $2:' || array_to_string($2, ',')
+                          || ' $3:' || $3;
+$$ language sql;
+\c test_part2
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+                          || ' $2:' || array_to_string($2, ',')
+                          || ' $3:' || $3;
+$$ language sql;
+\c test_part3
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+                          || ' $2:' || array_to_string($2, ',')
+                          || ' $3:' || $3;
+$$ language sql;
+
+\c regression
+
+-- invalid arg reference
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split $4; cluster 'testcluster'; run on 0;$$ language plproxy;
+select * from test_array(array['a'], array['g'], 'foo');
+
+-- invalid arg name
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split x; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+
+-- cannot split more than once
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b, b; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+
+-- attempt to split non-array
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split $3; cluster 'testcluster'; run on 0;$$ language plproxy;
+select * from test_array(array['a'], array['g'], 'foo');
+
+-- array size/dimensions mismatch
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+select * from test_array(array['a','b','c','d'], null, 'foo');
+select * from test_array(null, array['e','f','g','h'], 'foo');
+select * from test_array(array[array['a1'],array['a2']], array[array['b1'],array['b2']], 'foo');
+
+-- run on array hash, split one array
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a; cluster 'testcluster'; run on ascii(a);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+
+-- run on text hash, split two arrays (nop split)
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on ascii(c);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+
+-- run on array hash, split two arrays
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on ascii(a);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+select * from test_array(null, null, null);
+select * from test_array('{}'::text[], '{}'::text[], 'foo');
+
+-- run on arg
+create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as
+$$ split a; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy;
+
+select * from test_array_direct(array[2,3], array['a','b','c','d'], 'foo');
+
+create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy;
+
+select * from test_array_direct(array[0,1,2,3], array['a','b','c','d'], 'foo');
index f314ee5c347044a4942ff3c0ecf041f7a0c8bba7..3feb063de66cd6c0d0b14ec7ba723e5ea4a881a8 100644 (file)
@@ -406,7 +406,7 @@ resolve_query(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *query)
        HeapTuple       row;
        TupleDesc       desc;
 
-       plproxy_query_exec(func, fcinfo, query);
+       plproxy_query_exec(func, fcinfo, query, NULL, 0);
 
        if (SPI_processed != 1)
                plproxy_error(func, "'%s' returned %d rows, expected 1",
index f28114abc9172ecb5d98d1691befb0c95e5d7df6..b8517e94c8c2bccc9ff3b8f17a9d6839d781b078 100644 (file)
@@ -439,7 +439,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
        for (i = 0; i < cluster->conn_count; i++)
        {
                conn = &cluster->conn_list[i];
-               if (!conn->run_on)
+               if (!conn->run_tag)
                        continue;
 
                /* decide what to do */
@@ -482,7 +482,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
        for (i = 0; i < cluster->conn_count; i++)
        {
                conn = &cluster->conn_list[i];
-               if (!conn->run_on)
+               if (!conn->run_tag)
                        continue;
 
                switch (conn->state)
@@ -545,33 +545,32 @@ check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn
 
 /* Run the query on all tagged connections in parallel */
 static void
-remote_execute(ProxyFunction *func,
-                          const char **values, int *plengths, int *pformats)
+remote_execute(ProxyFunction *func)
 {
        ExecStatusType err;
        ProxyConnection *conn;
        ProxyCluster *cluster = func->cur_cluster;
        int                     i,
-                               pending;
+                               pending = 0;
        struct timeval now;
 
        /* either launch connection or send query */
        for (i = 0; i < cluster->conn_count; i++)
        {
                conn = &cluster->conn_list[i];
-               if (!conn->run_on)
+               if (!conn->run_tag)
                        continue;
 
                /* check if conn is alive, and launch if not */
                prepare_conn(func, conn);
+               pending++;
 
                /* if conn is ready, then send query away */
                if (conn->state == C_READY)
-                       send_query(func, conn, values, plengths, pformats);
+                       send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
        }
 
        /* now loop until all results are arrived */
-       pending = 1;
        while (pending)
        {
                /* allow postgres to cancel processing */
@@ -587,12 +586,12 @@ remote_execute(ProxyFunction *func,
                for (i = 0; i < cluster->conn_count; i++)
                {
                        conn = &cluster->conn_list[i];
-                       if (!conn->run_on)
+                       if (!conn->run_tag)
                                continue;
 
                        /* login finished, send query */
                        if (conn->state == C_READY)
-                               send_query(func, conn, values, plengths, pformats);
+                               send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
 
                        if (conn->state != C_DONE)
                                pending++;
@@ -606,11 +605,11 @@ remote_execute(ProxyFunction *func,
        {
                conn = &cluster->conn_list[i];
 
-               if ((conn->run_on || conn->res)
-                       && !(conn->run_on && conn->res))
-                       plproxy_error(func, "run_on does not match res");
+               if ((conn->run_tag || conn->res)
+                       && !(conn->run_tag && conn->res))
+                       plproxy_error(func, "run_tag does not match res");
 
-               if (!conn->run_on)
+               if (!conn->run_tag)
                        continue;
 
                if (conn->state != C_DONE)
@@ -661,9 +660,14 @@ remote_cancel(ProxyFunction *func)
        }
 }
 
-/* Run hash function and tag connections */
+/*
+ * Run hash function and tag connections. If any of the hash function 
+ * arguments are mentioned in the split_arrays an element of the array
+ * is used instead of the actual array.
+ */
 static void
-tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
+tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
+                                       DatumArray **array_params, int array_row)
 {
        int                     i;
        TupleDesc       desc;
@@ -671,7 +675,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
        ProxyCluster *cluster = func->cur_cluster;
 
        /* execute cached plan */
-       plproxy_query_exec(func, fcinfo, func->hash_sql);
+       plproxy_query_exec(func, fcinfo, func->hash_sql, array_params, array_row);
 
        /* get header */
        desc = SPI_tuptable->tupdesc;
@@ -698,7 +702,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
                        plproxy_error(func, "Hash result must be int2, int4 or int8");
 
                hashval &= cluster->part_mask;
-               cluster->part_map[hashval]->run_on = 1;
+               cluster->part_map[hashval]->run_tag = tag;
        }
 
        /* sanity check */
@@ -708,109 +712,305 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
                                                  " allows hashcount <> 1");
 }
 
-/* Clean old results and prepare for new one */
-void
-plproxy_clean_results(ProxyCluster *cluster)
+/*
+ * Deconstruct an array type to array of Datums, note NULL elements
+ * and determine the element type information.
+ */
+static DatumArray *
+make_datum_array(ProxyFunction *func, ArrayType *v, Oid elem_type)
 {
-       int                     i;
-       ProxyConnection *conn;
+       DatumArray         *da = palloc0(sizeof(*da));
 
-       if (!cluster)
-               return;
-
-       cluster->ret_total = 0;
-       cluster->ret_cur_conn = 0;
+       da->type = plproxy_find_type_info(func, elem_type, true);
 
-       for (i = 0; i < cluster->conn_count; i++)
-       {
-               conn = &cluster->conn_list[i];
-               if (conn->res)
-               {
-                       PQclear(conn->res);
-                       conn->res = NULL;
-               }
-               conn->pos = 0;
-               conn->run_on = 0;
-       }
-       /* conn state checks are done in prepare_conn */
+       if (v)
+               deconstruct_array(v,
+                                                 da->type->type_oid, da->type->length, da->type->by_value,
+                                                 da->type->alignment,
+                                                 &da->values, &da->nulls, &da->elem_count);
+       return da;
 }
 
-/* Select partitions and execute query on them */
-void
-plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
+/*
+ * Evaluate the run condition. Tag the matching connections with the specified
+ * tag.
+ *
+ * Note that we don't allow nested plproxy calls on the same cluster (ie.
+ * remote hash functions). The cluster and connection state are global and
+ * would easily get messed up.
+ */
+static void
+tag_run_on_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
+                                         DatumArray **array_params, int array_row)
 {
-       const char *values[FUNC_MAX_ARGS];
-       int                     plengths[FUNC_MAX_ARGS];
-       int                     pformats[FUNC_MAX_ARGS];
-       int                     i;
-       int                     gotbin;
-       ProxyCluster *cluster = func->cur_cluster;
-
-       /* clean old results */
-       plproxy_clean_results(cluster);
+       ProxyCluster   *cluster = func->cur_cluster;
+       int                             i;
 
-       /* tag interesting partitions */
        switch (func->run_type)
        {
                case R_HASH:
-                       tag_hash_partitions(func, fcinfo);
+                       tag_hash_partitions(func, fcinfo, tag, array_params, array_row);
                        break;
                case R_ALL:
                        for (i = 0; i < cluster->part_count; i++)
-                               cluster->part_map[i]->run_on = 1;
+                               cluster->part_map[i]->run_tag = tag;
                        break;
                case R_EXACT:
                        i = func->exact_nr;
                        if (i < 0 || i >= cluster->part_count)
                                plproxy_error(func, "part number out of range");
-                       cluster->part_map[i]->run_on = 1;
+                       cluster->part_map[i]->run_tag = tag;
                        break;
                case R_ANY:
                        i = random() & cluster->part_mask;
-                       cluster->part_map[i]->run_on = 1;
+                       cluster->part_map[i]->run_tag = tag;
                        break;
                default:
                        plproxy_error(func, "uninitialized run_type");
        }
+}
 
-       /* prepare args */
-       gotbin = 0;
-       for (i = 0; i < func->remote_sql->arg_count; i++)
+/*
+ * Tag the partitions to be run on, if split is requested prepare the 
+ * per-partition split array parameters.
+ *
+ * This is done by looping over all of the split arrays side-by-side, for each
+ * tuple see if it satisfies the RUN ON condition. If so, copy the tuple
+ * to the partition's private array parameters.
+ */
+static void
+prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+       int                                     i, row, col;
+       int                                     split_array_len = -1;
+       int                                     split_array_count = 0;
+       ProxyCluster       *cluster = func->cur_cluster;
+       DatumArray                 *arrays_to_split[FUNC_MAX_ARGS];
+
+       /*
+        * See if we have any arrays to split. If so, make them manageable by
+        * converting them to Datum arrays. During the process verify that all
+        * the arrays are of the same length.
+        */
+       for (i = 0; i < func->arg_count; i++)
        {
-               int idx = func->remote_sql->arg_lookup[i];
-               plengths[i] = 0;
-               pformats[i] = 0;
-               if (PG_ARGISNULL(idx))
+               ArrayType          *v;
+
+               if (!IS_SPLIT_ARG(func, i))
                {
-                       values[i] = NULL;
+                       arrays_to_split[i] = NULL;
+                       continue;
                }
+
+               if (PG_ARGISNULL(i))
+                       v = NULL;
                else
                {
-                       bool            bin = cluster->config.disable_binary ? 0 : 1;
+                       v = PG_GETARG_ARRAYTYPE_P(i);
 
-                       values[i] = plproxy_send_type(func->arg_types[idx],
-                                                                                 PG_GETARG_DATUM(idx),
-                                                                                 bin,
-                                                                                 &plengths[i],
-                                                                                 &pformats[i]);
+                       if (ARR_NDIM(v) > 1)
+                               plproxy_error(func, "split multi-dimensional arrays are not supported");
+               }
+
+               arrays_to_split[i] = make_datum_array(func, v, func->arg_types[i]->elem_type);
+
+               /* Check that the element counts match */
+               if (split_array_len < 0)
+                       split_array_len = arrays_to_split[i]->elem_count;
+               else if (arrays_to_split[i]->elem_count != split_array_len)
+                       plproxy_error(func, "split arrays must be of identical lengths");
+
+               ++split_array_count;
+       }
 
-                       if (pformats[i])
-                               gotbin = 1;
+       /* If nothing to split, just tag the partitions and be done with it */
+       if (!split_array_count)
+       {
+               tag_run_on_partitions(func, fcinfo, 1, NULL, 0);
+               return;
+       }
+
+       /* Need to split, evaluate the RUN ON condition for each of the elements. */
+       for (row = 0; row < split_array_len; row++)
+       {
+               int             part;
+               int             my_tag = row+1;
+
+               /*
+                * Tag the run-on partitions with a tag that allows us us to identify
+                * which partitions need the set of elements from this row.
+                */
+               tag_run_on_partitions(func, fcinfo, my_tag, arrays_to_split, row);
+
+               /* Add the array elements to the partitions tagged in previous step */
+               for (part = 0; part < cluster->conn_count; part++)
+               {
+                       ProxyConnection    *conn = &cluster->conn_list[part];
+
+                       if (conn->run_tag != my_tag)
+                               continue;
+
+                       if (!conn->bstate)
+                               conn->bstate = palloc0(func->arg_count * sizeof(*conn->bstate));
+
+                       /* Add this set of elements to the partition specific arrays */
+                       for (col = 0; col < func->arg_count; col++)
+                       {
+                               if (!IS_SPLIT_ARG(func, col))
+                                       continue;
+
+                               conn->bstate[col] = accumArrayResult(conn->bstate[col],
+                                                                                                        arrays_to_split[col]->values[row],
+                                                                                                        arrays_to_split[col]->nulls[row],
+                                                                                                        arrays_to_split[col]->type->type_oid,
+                                                                                                        CurrentMemoryContext);
+                       }
                }
        }
 
        /*
-        * Run query.  On cancel, send cancel request to partitions too.
+        * Finally, copy the accumulated arrays to the actual connections
+        * to be used as parameters.
+        */
+       for (i = 0; i < cluster->conn_count; i++)
+       {
+               ProxyConnection *conn = &cluster->conn_list[i];
+
+               if (!conn->run_tag)
+                       continue;
+
+               conn->split_params = palloc(func->arg_count * sizeof(*conn->split_params));
+
+               for (col = 0; col < func->arg_count; col++)
+               {
+                       if (!IS_SPLIT_ARG(func, col))
+                               conn->split_params[col] = PointerGetDatum(NULL);
+                       else
+                               conn->split_params[col] = makeArrayResult(conn->bstate[col],
+                                                                                                                 CurrentMemoryContext);
+               }
+       }
+}
+
+/*
+ * Prepare parameters for the query.
+ */
+static void
+prepare_query_parameters(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+       int                             i;
+       ProxyCluster   *cluster = func->cur_cluster;
+
+       for (i = 0; i < func->remote_sql->arg_count; i++)
+       {
+               int                     idx = func->remote_sql->arg_lookup[i];
+               bool            bin = cluster->config.disable_binary ? 0 : 1;
+               const char *fixed_param_val = NULL;
+               int                     fixed_param_len, fixed_param_fmt;
+               int                     part;
+
+               /* Avoid doing multiple conversions for fixed parameters */
+               if (!IS_SPLIT_ARG(func, idx) && !PG_ARGISNULL(idx))
+               {
+                       fixed_param_val = plproxy_send_type(func->arg_types[idx],
+                                                                                               PG_GETARG_DATUM(idx),
+                                                                                               bin,
+                                                                                               &fixed_param_len,
+                                                                                               &fixed_param_fmt);
+               }
+
+               /* Add the parameters to partitions */
+               for (part = 0; part < cluster->conn_count; part++)
+               {
+                       ProxyConnection *conn = &cluster->conn_list[part];
+
+                       if (!conn->run_tag)
+                               continue;
+
+                       if (PG_ARGISNULL(idx))
+                       {
+                               conn->param_values[i] = NULL;
+                               conn->param_lengths[i] = 0;
+                               conn->param_formats[i] = 0;
+                       }
+                       else
+                       {
+                               if (IS_SPLIT_ARG(func, idx))
+                               {
+                                       conn->param_values[i] = plproxy_send_type(func->arg_types[idx],
+                                                                                                                         conn->split_params[idx],
+                                                                                                                         bin,
+                                                                                                                         &conn->param_lengths[i],
+                                                                                                                         &conn->param_formats[i]);
+                               }
+                               else
+                               {
+                                       conn->param_values[i] = fixed_param_val;
+                                       conn->param_lengths[i] = fixed_param_len;
+                                       conn->param_formats[i] = fixed_param_fmt;
+                               }
+                       }
+               }
+       }
+}
+
+/* Clean old results and prepare for new one */
+void
+plproxy_clean_results(ProxyCluster *cluster)
+{
+       int                                     i;
+       ProxyConnection    *conn;
+
+       if (!cluster)
+               return;
+
+       cluster->ret_total = 0;
+       cluster->ret_cur_conn = 0;
+
+       for (i = 0; i < cluster->conn_count; i++)
+       {
+               conn = &cluster->conn_list[i];
+               if (conn->res)
+               {
+                       PQclear(conn->res);
+                       conn->res = NULL;
+               }
+               conn->pos = 0;
+               conn->run_tag = 0;
+               conn->bstate = NULL;
+       }
+       /* conn state checks are done in prepare_conn */
+}
+
+/* Select partitions and execute query on them */
+void
+plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+       /*
+        * Prepare parameters and run query.  On cancel, send cancel request to
+        * partitions too.
         */
        PG_TRY();
        {
-               if (gotbin)
-                       remote_execute(func, values, plengths, pformats);
-               else
-                       remote_execute(func, values, NULL, NULL);
+               func->cur_cluster->busy = true;
+
+               /* clean old results */
+               plproxy_clean_results(func->cur_cluster);
+
+               /* tag the partitions and prepare per-partition parameters */
+               prepare_and_tag_partitions(func, fcinfo);
+
+               /* prepare the target query parameters */
+               prepare_query_parameters(func, fcinfo);
+
+               remote_execute(func);
+
+               func->cur_cluster->busy = false;
        }
        PG_CATCH();
        {
+               func->cur_cluster->busy = false;
+
                if (geterrcode() == ERRCODE_QUERY_CANCELED)
                        remote_cancel(func);
                PG_RE_THROW();
index a948925000c8790d033f6b11a04d184788a372e9..42a97e23b924a321a5a8161190a4f33158480db8 100644 (file)
@@ -73,6 +73,63 @@ plproxy_func_strdup(ProxyFunction *func, const char *s)
        return res;
 }
 
+/* Find the index of a named parameter, -1 if not found */
+int
+plproxy_get_parameter_index(ProxyFunction *func, const char *ident)
+{
+       int             i;
+
+       if (ident[0] == '$')
+       {
+               /* Probably a $# parameter reference */
+               i = atoi(ident + 1) - 1;
+               if (i >= 0 && i < func->arg_count)
+                       return i;
+       }
+       else if (func->arg_names)
+       {
+               /* Named parameter, go through the argument names */
+               for (i = 0; i < func->arg_count; i++)
+               {
+                       if (!func->arg_names[i])
+                               continue;
+                       if (pg_strcasecmp(ident, func->arg_names[i]) == 0)
+                               return i;
+               }
+       }
+
+       return -1;
+}
+
+/* Add a new split argument */
+bool
+plproxy_split_add_ident(ProxyFunction *func, const char *ident)
+{
+       int             argindex;
+
+       if ((argindex = plproxy_get_parameter_index(func, ident)) < 0)
+               return false;
+
+       /* Already split? */
+       if (IS_SPLIT_ARG(func, argindex))
+               plproxy_error(func, "SPLIT parameter specified more than once: %s", ident);
+
+       /* Is it an array? */
+       if (!func->arg_types[argindex]->is_array)
+               plproxy_error(func, "SPLIT parameter is not an array: %s", ident);
+
+       if (!func->split_args)
+       {
+               size_t alloc_size = sizeof(*func->split_args) * func->arg_count;
+
+               func->split_args = plproxy_func_alloc(func, alloc_size);
+               MemSet(func->split_args, 0, alloc_size);
+       }
+
+       func->split_args[argindex] = true;
+
+       return true;
+}
 
 /* Initialize PL/Proxy function cache */
 void
@@ -413,11 +470,11 @@ fn_compile(FunctionCallInfo fcinfo,
 
        /* prepare local queries */
        if (f->cluster_sql)
-               plproxy_query_prepare(f, fcinfo, f->cluster_sql);
+               plproxy_query_prepare(f, fcinfo, f->cluster_sql, false);
        if (f->hash_sql)
-               plproxy_query_prepare(f, fcinfo, f->hash_sql);
+               plproxy_query_prepare(f, fcinfo, f->hash_sql, true);
        if (f->connect_sql)
-               plproxy_query_prepare(f, fcinfo, f->connect_sql);
+               plproxy_query_prepare(f, fcinfo, f->connect_sql, false);
 
        /* sanity check */
        if (f->run_type == R_ALL && !fcinfo->flinfo->fn_retset)
index d0a3672db1f3a70b0c29c75564c775977689d022..36441d96b88399a6c40552f022b2b90ecbf7df3e 100644 (file)
@@ -140,6 +140,10 @@ compile_and_execute(FunctionCallInfo fcinfo)
        /* get actual cluster to run on */
        cluster = plproxy_find_cluster(func, fcinfo);
 
+       /* Don't allow nested calls on the same cluster */
+       if (cluster->busy)
+               plproxy_error(func, "Nested PL/Proxy calls to the same cluster are not supported.");
+
        /* fetch PGresults */
        func->cur_cluster = cluster;
        plproxy_exec(func, fcinfo);
index f6c331a646b8e835b2ed13f74cd3148f08d7f5fb..987ef7a5935317e38daf248f8f5fc912176a33f4 100644 (file)
@@ -35,7 +35,7 @@ void plproxy_yy_scan_bytes(const char *bytes, int len);
 static ProxyFunction *xfunc;
 
 /* remember what happened */
-static int got_run, got_cluster, got_connect;
+static int got_run, got_cluster, got_connect, got_split;
 
 static QueryBuffer *cluster_sql;
 static QueryBuffer *select_sql;
@@ -48,7 +48,7 @@ static QueryBuffer *cur_sql;
 /* keep the resetting code together with variables */
 static void reset_parser_vars(void)
 {
-       got_run = got_cluster = got_connect = 0;
+       got_run = got_cluster = got_connect = got_split = 0;
        cur_sql = select_sql = cluster_sql = hash_sql = connect_sql = NULL;
        xfunc = NULL;
 }
@@ -58,7 +58,7 @@ static void reset_parser_vars(void)
 %name-prefix="plproxy_yy"
 
 %token <str> CONNECT CLUSTER RUN ON ALL ANY SELECT
-%token <str> IDENT NUMBER FNCALL STRING
+%token <str> IDENT NUMBER FNCALL SPLIT STRING
 %token <str> SQLIDENT SQLPART
 
 %union
@@ -70,7 +70,7 @@ static void reset_parser_vars(void)
 
 body: | body stmt ;
 
-stmt: cluster_stmt | run_stmt | select_stmt | connect_stmt ;
+stmt: cluster_stmt | split_stmt | run_stmt | select_stmt | connect_stmt ;
 
 connect_stmt: CONNECT connect_spec ';' {
                                        if (got_connect)
@@ -117,6 +117,22 @@ cluster_func: FNCALL       { cluster_sql = plproxy_query_start(xfunc, false);
 cluster_name: STRING   { xfunc->cluster_name = plproxy_func_strdup(xfunc, $1); }
                        ;
 
+split_stmt: SPLIT split_param_list ';' {
+                                                       if (got_split)
+                                                               yyerror("Only one SPLIT statement allowed");
+                                                       got_split = 1;
+                                               }
+                       ;
+
+split_param_list: split_param
+                       | split_param_list ',' split_param
+                       ;
+
+split_param: IDENT {
+                               if (!plproxy_split_add_ident(xfunc, $1))
+                                       yyerror("invalid argument reference: %s", $1);
+                       }
+
 run_stmt: RUN ON run_spec ';'  { if (got_run)
                                                                        yyerror("Only one RUN statement allowed");
                                                                  got_run = 1; }
index 37d97fd2effed5743d9a150ac6b4e17a24765197..8b8d20e628e0a0edd8a6aa83c51e0fbcec447adc 100644 (file)
@@ -36,6 +36,7 @@
 #include <commands/trigger.h>
 #include <mb/pg_wchar.h>
 #include <miscadmin.h>
+#include <utils/array.h>
 #include <utils/builtins.h>
 #include <utils/hsearch.h>
 #include <utils/lsyscache.h>
 #endif
 
 
+/*
+ * Determine if this argument is to SPLIT
+ */
+#define IS_SPLIT_ARG(func, arg)        ((func)->split_args && (func)->split_args[arg])
+
 /*
  * Maintenece period in seconds.  Connnections will be freed
  * from stale results, and checked for lifetime.
@@ -118,9 +124,25 @@ typedef struct
        ConnState       state;                  /* Connection state */
        time_t          connect_time;   /* When connection was started */
        time_t          query_time;             /* When last query was sent */
-       bool            run_on;                 /* True it this connection should be used */
        bool            same_ver;               /* True if dest backend has same X.Y ver */
        bool            tuning;                 /* True if tuning query is running on conn */
+
+       /*
+        * Nonzero if this connection should be used. The actual tag value is only
+        * used by SPLIT processing, others should treat it as a boolean value.
+        */
+       int                     run_tag;
+
+       /*
+        * Per-connection parameters. These are a assigned just before the 
+        * remote call is made.
+        */
+
+       Datum                      *split_params;                                       /* Split array parameters */
+       ArrayBuildState   **bstate;                                                     /* Temporary build state */
+       const char                 *param_values[FUNC_MAX_ARGS];        /* Parameter values */
+       int                                     param_lengths[FUNC_MAX_ARGS];   /* Parameter lengths (binary io) */
+       int                                     param_formats[FUNC_MAX_ARGS];   /* Parameter formats (binary io) */
 } ProxyConnection;
 
 /* Info about one cluster */
@@ -142,6 +164,8 @@ typedef struct ProxyCluster
        int                     ret_cur_conn;   /* Result walking: index of current conn */
        int                     ret_cur_pos;    /* Result walking: index of current row */
        int                     ret_total;              /* Result walking: total rows left */
+
+       bool            busy;                   /* True if the cluster is already involved in execution */
 } ProxyCluster;
 
 /*
@@ -161,6 +185,10 @@ typedef struct ProxyType
        bool            has_send;               /* Has binary output */
        bool            has_recv;               /* Has binary input */
        bool            by_value;               /* False if Datum is a pointer to data */
+       char            alignment;              /* Type alignment */
+       bool            is_array;               /* True if array */
+       Oid                     elem_type;              /* Array element type */
+       short           length;                 /* Type length */
 
        /* I/O functions */
        union
@@ -207,6 +235,17 @@ typedef struct ProxyQuery
        void       *plan;                       /* Optional prepared plan for local queries */
 } ProxyQuery;
 
+/*
+ * Deconstructed array parameters
+ */
+typedef struct DatumArray
+{
+       ProxyType  *type;
+       Datum      *values;
+       bool       *nulls;
+       int                     elem_count;
+} DatumArray;
+
 /*
  * Complete info about compiled function.
  *
@@ -224,6 +263,8 @@ typedef struct ProxyFunction
        char      **arg_names;          /* Argument names, may contain NULLs */
        short           arg_count;              /* Argument count of proxy function */
 
+       bool       *split_args;         /* Map of arguments to split */
+
        /* if the function returns untyped RECORD that needs AS clause */
        bool            dynamic_record;
 
@@ -272,6 +313,8 @@ void                plproxy_error(ProxyFunction *func, const char *fmt,...);
 void           plproxy_function_cache_init(void);
 void      *plproxy_func_alloc(ProxyFunction *func, int size);
 char      *plproxy_func_strdup(ProxyFunction *func, const char *s);
+int                    plproxy_get_parameter_index(ProxyFunction *func, const char *ident);
+bool           plproxy_split_add_ident(ProxyFunction *func, const char *ident);
 ProxyFunction *plproxy_compile(FunctionCallInfo fcinfo, bool validate);
 
 /* execute.c */
@@ -312,8 +355,9 @@ bool                plproxy_query_add_const(QueryBuffer *q, const char *data);
 bool           plproxy_query_add_ident(QueryBuffer *q, const char *ident);
 ProxyQuery *plproxy_query_finish(QueryBuffer *q);
 ProxyQuery *plproxy_standard_query(ProxyFunction *func, bool add_types);
-void           plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q);
-void           plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q);
+void           plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support);
+void           plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q,
+                                                          DatumArray **array_params, int array_row);
 void           plproxy_query_freeplan(ProxyQuery *q);
 
 #endif
index c734bf0ba83e0a0bc0c0b9c57f15f827da51d1c5..d0e0577a2818251705f1c5b4887caa685f33700e 100644 (file)
@@ -90,25 +90,8 @@ plproxy_query_add_ident(QueryBuffer *q, const char *ident)
                                fn_idx = -1,
                                sql_idx = -1;
 
-       if (ident[0] == '$')
-       {
-               fn_idx = atoi(ident + 1) - 1;
-               if (fn_idx < 0 || fn_idx >= q->func->arg_count)
-                       return false;
-       }
-       else if (q->func->arg_names)
-       {
-               for (i = 0; i < q->func->arg_count; i++)
-               {
-                       if (!q->func->arg_names[i])
-                               continue;
-                       if (pg_strcasecmp(ident, q->func->arg_names[i]) == 0)
-                       {
-                               fn_idx = i;
-                               break;
-                       }
-               }
-       }
+       fn_idx = plproxy_get_parameter_index(q->func, ident);
+
        if (fn_idx >= 0)
        {
                for (i = 0; i < q->arg_count; i++)
@@ -127,7 +110,12 @@ plproxy_query_add_ident(QueryBuffer *q, const char *ident)
                add_ref(q->sql, sql_idx, q->func, fn_idx, q->add_types);
        }
        else
+       {
+               if (ident[0] == '$')
+                       return false;
                appendStringInfoString(q->sql, ident);
+       }
+
        return true;
 }
 
@@ -149,6 +137,7 @@ plproxy_query_finish(QueryBuffer *q)
        len = q->arg_count * sizeof(int);
        pq->arg_lookup = palloc(len);
        pq->plan = NULL;
+
        memcpy(pq->arg_lookup, q->arg_lookup, len);
 
        MemoryContextSwitchTo(old);
@@ -247,7 +236,7 @@ plproxy_standard_query(ProxyFunction *func, bool add_types)
  * Prepare ProxyQuery for local execution
  */
 void
-plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
+plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support)
 {
        int                     i;
        Oid                     types[FUNC_MAX_ARGS];
@@ -258,7 +247,11 @@ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *
        {
                int                     idx = q->arg_lookup[i];
 
-               types[i] = func->arg_types[idx]->type_oid;
+               if (split_support && IS_SPLIT_ARG(func, idx))
+                       /* for SPLIT arguments use array element type instead */
+                       types[i] = func->arg_types[idx]->elem_type;
+               else 
+                       types[i] = func->arg_types[idx]->type_oid;
        }
 
        /* prepare & store plan */
@@ -272,12 +265,12 @@ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *
  * Result will be in SPI_tuptable.
  */
 void
-plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
+plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q,
+                                  DatumArray **array_params, int array_row)
 {
        int                     i,
                                idx,
                                err;
-       ProxyType  *type;
        char            arg_nulls[FUNC_MAX_ARGS];
        Datum           arg_values[FUNC_MAX_ARGS];
 
@@ -285,12 +278,19 @@ plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
        for (i = 0; i < q->arg_count; i++)
        {
                idx = q->arg_lookup[i];
-               type = func->arg_types[idx];
+
                if (PG_ARGISNULL(idx))
                {
                        arg_nulls[i] = 'n';
                        arg_values[i] = (Datum) NULL;
                }
+               else if (array_params && IS_SPLIT_ARG(func, idx))
+               {
+                       DatumArray *ats = array_params[idx];
+
+                       arg_nulls[i] = ats->nulls[array_row] ? 'n' : ' ';
+                       arg_values[i] = ats->nulls[array_row] ? (Datum) NULL : ats->values[array_row];
+               }
                else
                {
                        arg_nulls[i] = ' ';
index cb203b7db041bc02a03fd16f8183ca09e3f43bf9..36166c9a7180d582201b8055bb91bd839475b387 100644 (file)
@@ -179,6 +179,7 @@ run                 { return RUN; }
 on                     { return ON; }
 all                    { return ALL; }
 any                    { return ANY; }
+split          { return SPLIT; }
 select                 { BEGIN(sql); yylval.str = yytext; return SELECT; }
 
        /* function call */
index dd7165db2e40c5c27c81423017ea6bb7abf81ee8..3679d832709bbaa004d0c7d89724bf071de0d528 100644 (file)
@@ -253,6 +253,10 @@ plproxy_find_type_info(ProxyFunction *func, Oid oid, bool for_send)
        type->for_send = for_send;
        type->by_value = s_type->typbyval;
        type->name = plproxy_func_strdup(func, namebuf);
+       type->is_array = (s_type->typelem != 0 && s_type->typlen == -1);
+       type->elem_type = s_type->typelem;
+       type->alignment = s_type->typalign;
+       type->length = s_type->typlen;
 
        /* decide what function is needed */
        if (for_send)