It will split incoming array(s) into per-partition arrays.
Roughly based on design here:
http://lists.pgfoundry.org/pipermail/plproxy-users/2008-June/000093.html
Written by Martin Pihlak
# regression testing setup
REGRESS = plproxy_init plproxy_test plproxy_select plproxy_many \
plproxy_errors plproxy_clustermap plproxy_dynamic_record \
- plproxy_encoding
+ plproxy_encoding plproxy_split
REGRESS_OPTS = --load-language=plpgsql
# load PGXS makefile
Take hash value directly from function argument.
+
+== SPLIT ==
+
+ SPLIT array_arg_1 [ , array_arg_2 ... ] ;
+
+Split the input arrays based on RUN ON statement into per-partition arrays.
+This is done by evaluating RUN ON condition for each array element and building
+per-partition parameter arrays for each matching partition. During execution
+each tagged partition then gets its own subset of the array to process.
+
+The semantics of RUN ON statement is slightly changed with SPLIT arrays:
+
+ RUN ON partition_func(..);
+
+The array is split between the partitions matching `partition_func()`. Any
+SPLIT parameters passed to the function are actually replaced with the
+individual array elements.
+
+ RUN ON argname; RUN ON $1;
+
+An array of partition numbers (or hashes) can be passed as `argname`. The function
+shall be run on the partitions specified in the array.
+
+ RUN ON ANY;
+
+Each element is assigned to random partition.
+
+ RUN ON ALL;
+ RUN ON <NR>;
+
+Unaffected, except for the added overhead of array copying.
+
+Example:
+
+ CREATE FUNCTION set_profiles(i_users text[], i_profiles text[])
+ RETURNS SETOF text AS $$
+ CLUSTER 'userdb';
+ SPLIT i_users, i_profiles;
+ RUN ON hashtext(i_users);
+ $$ LANGUAGE plproxy;
+
+Given query:
+
+ SELECT * FROM set_profiles(ARRAY['foo', 'bar'], ARRAY['a', 'b']);
+
+The hash function is called 2 times:
+
+ SELECT * FROM hashtext('foo');
+ SELECT * FROM hashtext('bar');
+
+And target partitions get queries:
+
+ SELECT * FROM set_profiles(ARRAY['foo'], ARRAY['a']);
+ SELECT * FROM set_profiles(ARRAY['bar'], ARRAY['b']);
+
+
== SELECT ==
SELECT .... ;
== Just thoughts ==
- * SPREAD BY clause for OLAP loads.
* Drop plproxy.get_cluster_config()...
* Dynamic connstr loading for CONNECT functions?
--- /dev/null
+-- partition functions
+\c test_part0
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+ || ' $2:' || array_to_string($2, ',')
+ || ' $3:' || $3;
+$$ language sql;
+\c test_part1
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+ || ' $2:' || array_to_string($2, ',')
+ || ' $3:' || $3;
+$$ language sql;
+\c test_part2
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+ || ' $2:' || array_to_string($2, ',')
+ || ' $3:' || $3;
+$$ language sql;
+\c test_part3
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+ || ' $2:' || array_to_string($2, ',')
+ || ' $3:' || $3;
+$$ language sql;
+\c regression
+-- invalid arg reference
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split $4; cluster 'testcluster'; run on 0;$$ language plproxy;
+select * from test_array(array['a'], array['g'], 'foo');
+ERROR: PL/Proxy function public.test_array(3): Compile error at line 1: invalid argument reference: $4
+-- invalid arg name
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split x; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+ERROR: PL/Proxy function public.test_array(3): Compile error at line 1: invalid argument reference: x
+-- cannot split more than once
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b, b; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+ERROR: PL/Proxy function public.test_array(3): SPLIT parameter specified more than once: b
+-- attempt to split non-array
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split $3; cluster 'testcluster'; run on 0;$$ language plproxy;
+select * from test_array(array['a'], array['g'], 'foo');
+ERROR: PL/Proxy function public.test_array(3): SPLIT parameter is not an array: $3
+-- array size/dimensions mismatch
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+ERROR: PL/Proxy function public.test_array(3): split arrays must be of identical lengths
+select * from test_array(array['a','b','c','d'], null, 'foo');
+ERROR: PL/Proxy function public.test_array(3): split arrays must be of identical lengths
+select * from test_array(null, array['e','f','g','h'], 'foo');
+ERROR: PL/Proxy function public.test_array(3): split arrays must be of identical lengths
+select * from test_array(array[array['a1'],array['a2']], array[array['b1'],array['b2']], 'foo');
+ERROR: PL/Proxy function public.test_array(3): split multi-dimensional arrays are not supported
+-- run on array hash, split one array
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a; cluster 'testcluster'; run on ascii(a);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+ test_array
+-----------------------------------
+ test_part0 $1:d $2:e,f,g,h $3:foo
+ test_part1 $1:a $2:e,f,g,h $3:foo
+ test_part2 $1:b $2:e,f,g,h $3:foo
+ test_part3 $1:c $2:e,f,g,h $3:foo
+(4 rows)
+
+-- run on text hash, split two arrays (nop split)
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on ascii(c);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+ test_array
+-----------------------------------------
+ test_part2 $1:a,b,c,d $2:e,f,g,h $3:foo
+(1 row)
+
+-- run on array hash, split two arrays
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on ascii(a);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+ test_array
+-----------------------------
+ test_part0 $1:d $2:h $3:foo
+ test_part1 $1:a $2:e $3:foo
+ test_part2 $1:b $2:f $3:foo
+ test_part3 $1:c $2:g $3:foo
+(4 rows)
+
+select * from test_array(null, null, null);
+ test_array
+------------
+(0 rows)
+
+select * from test_array('{}'::text[], '{}'::text[], 'foo');
+ test_array
+------------
+(0 rows)
+
+-- run on arg
+create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as
+$$ split a; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy;
+select * from test_array_direct(array[2,3], array['a','b','c','d'], 'foo');
+ test_array_direct
+----------------------------------
+ test_part2 $1: $2:a,b,c,d $3:foo
+ test_part3 $1: $2:a,b,c,d $3:foo
+(2 rows)
+
+create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy;
+select * from test_array_direct(array[0,1,2,3], array['a','b','c','d'], 'foo');
+ test_array_direct
+----------------------------
+ test_part0 $1: $2:a $3:foo
+ test_part1 $1: $2:b $3:foo
+ test_part2 $1: $2:c $3:foo
+ test_part3 $1: $2:d $3:foo
+(4 rows)
+
--- /dev/null
+-- partition functions
+\c test_part0
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+ || ' $2:' || array_to_string($2, ',')
+ || ' $3:' || $3;
+$$ language sql;
+\c test_part1
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+ || ' $2:' || array_to_string($2, ',')
+ || ' $3:' || $3;
+$$ language sql;
+\c test_part2
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+ || ' $2:' || array_to_string($2, ',')
+ || ' $3:' || $3;
+$$ language sql;
+\c test_part3
+create or replace function test_array(a text[], b text[], c text) returns text as
+$$
+select current_database() || ' $1:' || array_to_string($1, ',')
+ || ' $2:' || array_to_string($2, ',')
+ || ' $3:' || $3;
+$$ language sql;
+
+\c regression
+
+-- invalid arg reference
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split $4; cluster 'testcluster'; run on 0;$$ language plproxy;
+select * from test_array(array['a'], array['g'], 'foo');
+
+-- invalid arg name
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split x; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+
+-- cannot split more than once
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b, b; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+
+-- attempt to split non-array
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split $3; cluster 'testcluster'; run on 0;$$ language plproxy;
+select * from test_array(array['a'], array['g'], 'foo');
+
+-- array size/dimensions mismatch
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on 0; $$ language plproxy;
+select * from test_array(array['a'], array['b', 'c'], 'foo');
+select * from test_array(array['a','b','c','d'], null, 'foo');
+select * from test_array(null, array['e','f','g','h'], 'foo');
+select * from test_array(array[array['a1'],array['a2']], array[array['b1'],array['b2']], 'foo');
+
+-- run on array hash, split one array
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a; cluster 'testcluster'; run on ascii(a);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+
+-- run on text hash, split two arrays (nop split)
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on ascii(c);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+
+-- run on array hash, split two arrays
+create or replace function test_array(a text[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on ascii(a);$$ language plproxy;
+select * from test_array(array['a','b','c','d'], array['e','f','g','h'], 'foo');
+select * from test_array(null, null, null);
+select * from test_array('{}'::text[], '{}'::text[], 'foo');
+
+-- run on arg
+create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as
+$$ split a; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy;
+
+select * from test_array_direct(array[2,3], array['a','b','c','d'], 'foo');
+
+create or replace function test_array_direct(a integer[], b text[], c text) returns setof text as
+$$ split a, b; cluster 'testcluster'; run on a; select test_array('{}'::text[], b, c);$$ language plproxy;
+
+select * from test_array_direct(array[0,1,2,3], array['a','b','c','d'], 'foo');
HeapTuple row;
TupleDesc desc;
- plproxy_query_exec(func, fcinfo, query);
+ plproxy_query_exec(func, fcinfo, query, NULL, 0);
if (SPI_processed != 1)
plproxy_error(func, "'%s' returned %d rows, expected 1",
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
/* decide what to do */
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
switch (conn->state)
/* Run the query on all tagged connections in parallel */
static void
-remote_execute(ProxyFunction *func,
- const char **values, int *plengths, int *pformats)
+remote_execute(ProxyFunction *func)
{
ExecStatusType err;
ProxyConnection *conn;
ProxyCluster *cluster = func->cur_cluster;
int i,
- pending;
+ pending = 0;
struct timeval now;
/* either launch connection or send query */
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
/* check if conn is alive, and launch if not */
prepare_conn(func, conn);
+ pending++;
/* if conn is ready, then send query away */
if (conn->state == C_READY)
- send_query(func, conn, values, plengths, pformats);
+ send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
}
/* now loop until all results are arrived */
- pending = 1;
while (pending)
{
/* allow postgres to cancel processing */
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
/* login finished, send query */
if (conn->state == C_READY)
- send_query(func, conn, values, plengths, pformats);
+ send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
if (conn->state != C_DONE)
pending++;
{
conn = &cluster->conn_list[i];
- if ((conn->run_on || conn->res)
- && !(conn->run_on && conn->res))
- plproxy_error(func, "run_on does not match res");
+ if ((conn->run_tag || conn->res)
+ && !(conn->run_tag && conn->res))
+ plproxy_error(func, "run_tag does not match res");
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
if (conn->state != C_DONE)
}
}
-/* Run hash function and tag connections */
+/*
+ * Run hash function and tag connections. If any of the hash function
+ * arguments are mentioned in the split_arrays an element of the array
+ * is used instead of the actual array.
+ */
static void
-tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
+tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
+ DatumArray **array_params, int array_row)
{
int i;
TupleDesc desc;
ProxyCluster *cluster = func->cur_cluster;
/* execute cached plan */
- plproxy_query_exec(func, fcinfo, func->hash_sql);
+ plproxy_query_exec(func, fcinfo, func->hash_sql, array_params, array_row);
/* get header */
desc = SPI_tuptable->tupdesc;
plproxy_error(func, "Hash result must be int2, int4 or int8");
hashval &= cluster->part_mask;
- cluster->part_map[hashval]->run_on = 1;
+ cluster->part_map[hashval]->run_tag = tag;
}
/* sanity check */
" allows hashcount <> 1");
}
-/* Clean old results and prepare for new one */
-void
-plproxy_clean_results(ProxyCluster *cluster)
+/*
+ * Deconstruct an array type to array of Datums, note NULL elements
+ * and determine the element type information.
+ */
+static DatumArray *
+make_datum_array(ProxyFunction *func, ArrayType *v, Oid elem_type)
{
- int i;
- ProxyConnection *conn;
+ DatumArray *da = palloc0(sizeof(*da));
- if (!cluster)
- return;
-
- cluster->ret_total = 0;
- cluster->ret_cur_conn = 0;
+ da->type = plproxy_find_type_info(func, elem_type, true);
- for (i = 0; i < cluster->conn_count; i++)
- {
- conn = &cluster->conn_list[i];
- if (conn->res)
- {
- PQclear(conn->res);
- conn->res = NULL;
- }
- conn->pos = 0;
- conn->run_on = 0;
- }
- /* conn state checks are done in prepare_conn */
+ if (v)
+ deconstruct_array(v,
+ da->type->type_oid, da->type->length, da->type->by_value,
+ da->type->alignment,
+ &da->values, &da->nulls, &da->elem_count);
+ return da;
}
-/* Select partitions and execute query on them */
-void
-plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
+/*
+ * Evaluate the run condition. Tag the matching connections with the specified
+ * tag.
+ *
+ * Note that we don't allow nested plproxy calls on the same cluster (ie.
+ * remote hash functions). The cluster and connection state are global and
+ * would easily get messed up.
+ */
+static void
+tag_run_on_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
+ DatumArray **array_params, int array_row)
{
- const char *values[FUNC_MAX_ARGS];
- int plengths[FUNC_MAX_ARGS];
- int pformats[FUNC_MAX_ARGS];
- int i;
- int gotbin;
- ProxyCluster *cluster = func->cur_cluster;
-
- /* clean old results */
- plproxy_clean_results(cluster);
+ ProxyCluster *cluster = func->cur_cluster;
+ int i;
- /* tag interesting partitions */
switch (func->run_type)
{
case R_HASH:
- tag_hash_partitions(func, fcinfo);
+ tag_hash_partitions(func, fcinfo, tag, array_params, array_row);
break;
case R_ALL:
for (i = 0; i < cluster->part_count; i++)
- cluster->part_map[i]->run_on = 1;
+ cluster->part_map[i]->run_tag = tag;
break;
case R_EXACT:
i = func->exact_nr;
if (i < 0 || i >= cluster->part_count)
plproxy_error(func, "part number out of range");
- cluster->part_map[i]->run_on = 1;
+ cluster->part_map[i]->run_tag = tag;
break;
case R_ANY:
i = random() & cluster->part_mask;
- cluster->part_map[i]->run_on = 1;
+ cluster->part_map[i]->run_tag = tag;
break;
default:
plproxy_error(func, "uninitialized run_type");
}
+}
- /* prepare args */
- gotbin = 0;
- for (i = 0; i < func->remote_sql->arg_count; i++)
+/*
+ * Tag the partitions to be run on, if split is requested prepare the
+ * per-partition split array parameters.
+ *
+ * This is done by looping over all of the split arrays side-by-side, for each
+ * tuple see if it satisfies the RUN ON condition. If so, copy the tuple
+ * to the partition's private array parameters.
+ */
+static void
+prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+ int i, row, col;
+ int split_array_len = -1;
+ int split_array_count = 0;
+ ProxyCluster *cluster = func->cur_cluster;
+ DatumArray *arrays_to_split[FUNC_MAX_ARGS];
+
+ /*
+ * See if we have any arrays to split. If so, make them manageable by
+ * converting them to Datum arrays. During the process verify that all
+ * the arrays are of the same length.
+ */
+ for (i = 0; i < func->arg_count; i++)
{
- int idx = func->remote_sql->arg_lookup[i];
- plengths[i] = 0;
- pformats[i] = 0;
- if (PG_ARGISNULL(idx))
+ ArrayType *v;
+
+ if (!IS_SPLIT_ARG(func, i))
{
- values[i] = NULL;
+ arrays_to_split[i] = NULL;
+ continue;
}
+
+ if (PG_ARGISNULL(i))
+ v = NULL;
else
{
- bool bin = cluster->config.disable_binary ? 0 : 1;
+ v = PG_GETARG_ARRAYTYPE_P(i);
- values[i] = plproxy_send_type(func->arg_types[idx],
- PG_GETARG_DATUM(idx),
- bin,
- &plengths[i],
- &pformats[i]);
+ if (ARR_NDIM(v) > 1)
+ plproxy_error(func, "split multi-dimensional arrays are not supported");
+ }
+
+ arrays_to_split[i] = make_datum_array(func, v, func->arg_types[i]->elem_type);
+
+ /* Check that the element counts match */
+ if (split_array_len < 0)
+ split_array_len = arrays_to_split[i]->elem_count;
+ else if (arrays_to_split[i]->elem_count != split_array_len)
+ plproxy_error(func, "split arrays must be of identical lengths");
+
+ ++split_array_count;
+ }
- if (pformats[i])
- gotbin = 1;
+ /* If nothing to split, just tag the partitions and be done with it */
+ if (!split_array_count)
+ {
+ tag_run_on_partitions(func, fcinfo, 1, NULL, 0);
+ return;
+ }
+
+ /* Need to split, evaluate the RUN ON condition for each of the elements. */
+ for (row = 0; row < split_array_len; row++)
+ {
+ int part;
+ int my_tag = row+1;
+
+ /*
+ * Tag the run-on partitions with a tag that allows us us to identify
+ * which partitions need the set of elements from this row.
+ */
+ tag_run_on_partitions(func, fcinfo, my_tag, arrays_to_split, row);
+
+ /* Add the array elements to the partitions tagged in previous step */
+ for (part = 0; part < cluster->conn_count; part++)
+ {
+ ProxyConnection *conn = &cluster->conn_list[part];
+
+ if (conn->run_tag != my_tag)
+ continue;
+
+ if (!conn->bstate)
+ conn->bstate = palloc0(func->arg_count * sizeof(*conn->bstate));
+
+ /* Add this set of elements to the partition specific arrays */
+ for (col = 0; col < func->arg_count; col++)
+ {
+ if (!IS_SPLIT_ARG(func, col))
+ continue;
+
+ conn->bstate[col] = accumArrayResult(conn->bstate[col],
+ arrays_to_split[col]->values[row],
+ arrays_to_split[col]->nulls[row],
+ arrays_to_split[col]->type->type_oid,
+ CurrentMemoryContext);
+ }
}
}
/*
- * Run query. On cancel, send cancel request to partitions too.
+ * Finally, copy the accumulated arrays to the actual connections
+ * to be used as parameters.
+ */
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ ProxyConnection *conn = &cluster->conn_list[i];
+
+ if (!conn->run_tag)
+ continue;
+
+ conn->split_params = palloc(func->arg_count * sizeof(*conn->split_params));
+
+ for (col = 0; col < func->arg_count; col++)
+ {
+ if (!IS_SPLIT_ARG(func, col))
+ conn->split_params[col] = PointerGetDatum(NULL);
+ else
+ conn->split_params[col] = makeArrayResult(conn->bstate[col],
+ CurrentMemoryContext);
+ }
+ }
+}
+
+/*
+ * Prepare parameters for the query.
+ */
+static void
+prepare_query_parameters(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+ int i;
+ ProxyCluster *cluster = func->cur_cluster;
+
+ for (i = 0; i < func->remote_sql->arg_count; i++)
+ {
+ int idx = func->remote_sql->arg_lookup[i];
+ bool bin = cluster->config.disable_binary ? 0 : 1;
+ const char *fixed_param_val = NULL;
+ int fixed_param_len, fixed_param_fmt;
+ int part;
+
+ /* Avoid doing multiple conversions for fixed parameters */
+ if (!IS_SPLIT_ARG(func, idx) && !PG_ARGISNULL(idx))
+ {
+ fixed_param_val = plproxy_send_type(func->arg_types[idx],
+ PG_GETARG_DATUM(idx),
+ bin,
+ &fixed_param_len,
+ &fixed_param_fmt);
+ }
+
+ /* Add the parameters to partitions */
+ for (part = 0; part < cluster->conn_count; part++)
+ {
+ ProxyConnection *conn = &cluster->conn_list[part];
+
+ if (!conn->run_tag)
+ continue;
+
+ if (PG_ARGISNULL(idx))
+ {
+ conn->param_values[i] = NULL;
+ conn->param_lengths[i] = 0;
+ conn->param_formats[i] = 0;
+ }
+ else
+ {
+ if (IS_SPLIT_ARG(func, idx))
+ {
+ conn->param_values[i] = plproxy_send_type(func->arg_types[idx],
+ conn->split_params[idx],
+ bin,
+ &conn->param_lengths[i],
+ &conn->param_formats[i]);
+ }
+ else
+ {
+ conn->param_values[i] = fixed_param_val;
+ conn->param_lengths[i] = fixed_param_len;
+ conn->param_formats[i] = fixed_param_fmt;
+ }
+ }
+ }
+ }
+}
+
+/* Clean old results and prepare for new one */
+void
+plproxy_clean_results(ProxyCluster *cluster)
+{
+ int i;
+ ProxyConnection *conn;
+
+ if (!cluster)
+ return;
+
+ cluster->ret_total = 0;
+ cluster->ret_cur_conn = 0;
+
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (conn->res)
+ {
+ PQclear(conn->res);
+ conn->res = NULL;
+ }
+ conn->pos = 0;
+ conn->run_tag = 0;
+ conn->bstate = NULL;
+ }
+ /* conn state checks are done in prepare_conn */
+}
+
+/* Select partitions and execute query on them */
+void
+plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+ /*
+ * Prepare parameters and run query. On cancel, send cancel request to
+ * partitions too.
*/
PG_TRY();
{
- if (gotbin)
- remote_execute(func, values, plengths, pformats);
- else
- remote_execute(func, values, NULL, NULL);
+ func->cur_cluster->busy = true;
+
+ /* clean old results */
+ plproxy_clean_results(func->cur_cluster);
+
+ /* tag the partitions and prepare per-partition parameters */
+ prepare_and_tag_partitions(func, fcinfo);
+
+ /* prepare the target query parameters */
+ prepare_query_parameters(func, fcinfo);
+
+ remote_execute(func);
+
+ func->cur_cluster->busy = false;
}
PG_CATCH();
{
+ func->cur_cluster->busy = false;
+
if (geterrcode() == ERRCODE_QUERY_CANCELED)
remote_cancel(func);
PG_RE_THROW();
return res;
}
+/* Find the index of a named parameter, -1 if not found */
+int
+plproxy_get_parameter_index(ProxyFunction *func, const char *ident)
+{
+ int i;
+
+ if (ident[0] == '$')
+ {
+ /* Probably a $# parameter reference */
+ i = atoi(ident + 1) - 1;
+ if (i >= 0 && i < func->arg_count)
+ return i;
+ }
+ else if (func->arg_names)
+ {
+ /* Named parameter, go through the argument names */
+ for (i = 0; i < func->arg_count; i++)
+ {
+ if (!func->arg_names[i])
+ continue;
+ if (pg_strcasecmp(ident, func->arg_names[i]) == 0)
+ return i;
+ }
+ }
+
+ return -1;
+}
+
+/* Add a new split argument */
+bool
+plproxy_split_add_ident(ProxyFunction *func, const char *ident)
+{
+ int argindex;
+
+ if ((argindex = plproxy_get_parameter_index(func, ident)) < 0)
+ return false;
+
+ /* Already split? */
+ if (IS_SPLIT_ARG(func, argindex))
+ plproxy_error(func, "SPLIT parameter specified more than once: %s", ident);
+
+ /* Is it an array? */
+ if (!func->arg_types[argindex]->is_array)
+ plproxy_error(func, "SPLIT parameter is not an array: %s", ident);
+
+ if (!func->split_args)
+ {
+ size_t alloc_size = sizeof(*func->split_args) * func->arg_count;
+
+ func->split_args = plproxy_func_alloc(func, alloc_size);
+ MemSet(func->split_args, 0, alloc_size);
+ }
+
+ func->split_args[argindex] = true;
+
+ return true;
+}
/* Initialize PL/Proxy function cache */
void
/* prepare local queries */
if (f->cluster_sql)
- plproxy_query_prepare(f, fcinfo, f->cluster_sql);
+ plproxy_query_prepare(f, fcinfo, f->cluster_sql, false);
if (f->hash_sql)
- plproxy_query_prepare(f, fcinfo, f->hash_sql);
+ plproxy_query_prepare(f, fcinfo, f->hash_sql, true);
if (f->connect_sql)
- plproxy_query_prepare(f, fcinfo, f->connect_sql);
+ plproxy_query_prepare(f, fcinfo, f->connect_sql, false);
/* sanity check */
if (f->run_type == R_ALL && !fcinfo->flinfo->fn_retset)
/* get actual cluster to run on */
cluster = plproxy_find_cluster(func, fcinfo);
+ /* Don't allow nested calls on the same cluster */
+ if (cluster->busy)
+ plproxy_error(func, "Nested PL/Proxy calls to the same cluster are not supported.");
+
/* fetch PGresults */
func->cur_cluster = cluster;
plproxy_exec(func, fcinfo);
static ProxyFunction *xfunc;
/* remember what happened */
-static int got_run, got_cluster, got_connect;
+static int got_run, got_cluster, got_connect, got_split;
static QueryBuffer *cluster_sql;
static QueryBuffer *select_sql;
/* keep the resetting code together with variables */
static void reset_parser_vars(void)
{
- got_run = got_cluster = got_connect = 0;
+ got_run = got_cluster = got_connect = got_split = 0;
cur_sql = select_sql = cluster_sql = hash_sql = connect_sql = NULL;
xfunc = NULL;
}
%name-prefix="plproxy_yy"
%token <str> CONNECT CLUSTER RUN ON ALL ANY SELECT
-%token <str> IDENT NUMBER FNCALL STRING
+%token <str> IDENT NUMBER FNCALL SPLIT STRING
%token <str> SQLIDENT SQLPART
%union
body: | body stmt ;
-stmt: cluster_stmt | run_stmt | select_stmt | connect_stmt ;
+stmt: cluster_stmt | split_stmt | run_stmt | select_stmt | connect_stmt ;
connect_stmt: CONNECT connect_spec ';' {
if (got_connect)
cluster_name: STRING { xfunc->cluster_name = plproxy_func_strdup(xfunc, $1); }
;
+split_stmt: SPLIT split_param_list ';' {
+ if (got_split)
+ yyerror("Only one SPLIT statement allowed");
+ got_split = 1;
+ }
+ ;
+
+split_param_list: split_param
+ | split_param_list ',' split_param
+ ;
+
+split_param: IDENT {
+ if (!plproxy_split_add_ident(xfunc, $1))
+ yyerror("invalid argument reference: %s", $1);
+ }
+
run_stmt: RUN ON run_spec ';' { if (got_run)
yyerror("Only one RUN statement allowed");
got_run = 1; }
#include <commands/trigger.h>
#include <mb/pg_wchar.h>
#include <miscadmin.h>
+#include <utils/array.h>
#include <utils/builtins.h>
#include <utils/hsearch.h>
#include <utils/lsyscache.h>
#endif
+/*
+ * Determine if this argument is to SPLIT
+ */
+#define IS_SPLIT_ARG(func, arg) ((func)->split_args && (func)->split_args[arg])
+
/*
* Maintenece period in seconds. Connnections will be freed
* from stale results, and checked for lifetime.
ConnState state; /* Connection state */
time_t connect_time; /* When connection was started */
time_t query_time; /* When last query was sent */
- bool run_on; /* True it this connection should be used */
bool same_ver; /* True if dest backend has same X.Y ver */
bool tuning; /* True if tuning query is running on conn */
+
+ /*
+ * Nonzero if this connection should be used. The actual tag value is only
+ * used by SPLIT processing, others should treat it as a boolean value.
+ */
+ int run_tag;
+
+ /*
+ * Per-connection parameters. These are a assigned just before the
+ * remote call is made.
+ */
+
+ Datum *split_params; /* Split array parameters */
+ ArrayBuildState **bstate; /* Temporary build state */
+ const char *param_values[FUNC_MAX_ARGS]; /* Parameter values */
+ int param_lengths[FUNC_MAX_ARGS]; /* Parameter lengths (binary io) */
+ int param_formats[FUNC_MAX_ARGS]; /* Parameter formats (binary io) */
} ProxyConnection;
/* Info about one cluster */
int ret_cur_conn; /* Result walking: index of current conn */
int ret_cur_pos; /* Result walking: index of current row */
int ret_total; /* Result walking: total rows left */
+
+ bool busy; /* True if the cluster is already involved in execution */
} ProxyCluster;
/*
bool has_send; /* Has binary output */
bool has_recv; /* Has binary input */
bool by_value; /* False if Datum is a pointer to data */
+ char alignment; /* Type alignment */
+ bool is_array; /* True if array */
+ Oid elem_type; /* Array element type */
+ short length; /* Type length */
/* I/O functions */
union
void *plan; /* Optional prepared plan for local queries */
} ProxyQuery;
+/*
+ * Deconstructed array parameters
+ */
+typedef struct DatumArray
+{
+ ProxyType *type;
+ Datum *values;
+ bool *nulls;
+ int elem_count;
+} DatumArray;
+
/*
* Complete info about compiled function.
*
char **arg_names; /* Argument names, may contain NULLs */
short arg_count; /* Argument count of proxy function */
+ bool *split_args; /* Map of arguments to split */
+
/* if the function returns untyped RECORD that needs AS clause */
bool dynamic_record;
void plproxy_function_cache_init(void);
void *plproxy_func_alloc(ProxyFunction *func, int size);
char *plproxy_func_strdup(ProxyFunction *func, const char *s);
+int plproxy_get_parameter_index(ProxyFunction *func, const char *ident);
+bool plproxy_split_add_ident(ProxyFunction *func, const char *ident);
ProxyFunction *plproxy_compile(FunctionCallInfo fcinfo, bool validate);
/* execute.c */
bool plproxy_query_add_ident(QueryBuffer *q, const char *ident);
ProxyQuery *plproxy_query_finish(QueryBuffer *q);
ProxyQuery *plproxy_standard_query(ProxyFunction *func, bool add_types);
-void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q);
-void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q);
+void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support);
+void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q,
+ DatumArray **array_params, int array_row);
void plproxy_query_freeplan(ProxyQuery *q);
#endif
fn_idx = -1,
sql_idx = -1;
- if (ident[0] == '$')
- {
- fn_idx = atoi(ident + 1) - 1;
- if (fn_idx < 0 || fn_idx >= q->func->arg_count)
- return false;
- }
- else if (q->func->arg_names)
- {
- for (i = 0; i < q->func->arg_count; i++)
- {
- if (!q->func->arg_names[i])
- continue;
- if (pg_strcasecmp(ident, q->func->arg_names[i]) == 0)
- {
- fn_idx = i;
- break;
- }
- }
- }
+ fn_idx = plproxy_get_parameter_index(q->func, ident);
+
if (fn_idx >= 0)
{
for (i = 0; i < q->arg_count; i++)
add_ref(q->sql, sql_idx, q->func, fn_idx, q->add_types);
}
else
+ {
+ if (ident[0] == '$')
+ return false;
appendStringInfoString(q->sql, ident);
+ }
+
return true;
}
len = q->arg_count * sizeof(int);
pq->arg_lookup = palloc(len);
pq->plan = NULL;
+
memcpy(pq->arg_lookup, q->arg_lookup, len);
MemoryContextSwitchTo(old);
* Prepare ProxyQuery for local execution
*/
void
-plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
+plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support)
{
int i;
Oid types[FUNC_MAX_ARGS];
{
int idx = q->arg_lookup[i];
- types[i] = func->arg_types[idx]->type_oid;
+ if (split_support && IS_SPLIT_ARG(func, idx))
+ /* for SPLIT arguments use array element type instead */
+ types[i] = func->arg_types[idx]->elem_type;
+ else
+ types[i] = func->arg_types[idx]->type_oid;
}
/* prepare & store plan */
* Result will be in SPI_tuptable.
*/
void
-plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
+plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q,
+ DatumArray **array_params, int array_row)
{
int i,
idx,
err;
- ProxyType *type;
char arg_nulls[FUNC_MAX_ARGS];
Datum arg_values[FUNC_MAX_ARGS];
for (i = 0; i < q->arg_count; i++)
{
idx = q->arg_lookup[i];
- type = func->arg_types[idx];
+
if (PG_ARGISNULL(idx))
{
arg_nulls[i] = 'n';
arg_values[i] = (Datum) NULL;
}
+ else if (array_params && IS_SPLIT_ARG(func, idx))
+ {
+ DatumArray *ats = array_params[idx];
+
+ arg_nulls[i] = ats->nulls[array_row] ? 'n' : ' ';
+ arg_values[i] = ats->nulls[array_row] ? (Datum) NULL : ats->values[array_row];
+ }
else
{
arg_nulls[i] = ' ';
on { return ON; }
all { return ALL; }
any { return ANY; }
+split { return SPLIT; }
select { BEGIN(sql); yylval.str = yytext; return SELECT; }
/* function call */
type->for_send = for_send;
type->by_value = s_type->typbyval;
type->name = plproxy_func_strdup(func, namebuf);
+ type->is_array = (s_type->typelem != 0 && s_type->typlen == -1);
+ type->elem_type = s_type->typelem;
+ type->alignment = s_type->typalign;
+ type->length = s_type->typlen;
/* decide what function is needed */
if (for_send)