summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cluster.c2
-rw-r--r--src/execute.c356
-rw-r--r--src/function.c63
-rw-r--r--src/main.c4
-rw-r--r--src/parser.y24
-rw-r--r--src/plproxy.h50
-rw-r--r--src/query.c48
-rw-r--r--src/scanner.l1
-rw-r--r--src/type.c4
9 files changed, 439 insertions, 113 deletions
diff --git a/src/cluster.c b/src/cluster.c
index f314ee5..3feb063 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -406,7 +406,7 @@ resolve_query(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *query)
HeapTuple row;
TupleDesc desc;
- plproxy_query_exec(func, fcinfo, query);
+ plproxy_query_exec(func, fcinfo, query, NULL, 0);
if (SPI_processed != 1)
plproxy_error(func, "'%s' returned %d rows, expected 1",
diff --git a/src/execute.c b/src/execute.c
index f28114a..b8517e9 100644
--- a/src/execute.c
+++ b/src/execute.c
@@ -439,7 +439,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
/* decide what to do */
@@ -482,7 +482,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
switch (conn->state)
@@ -545,33 +545,32 @@ check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn
/* Run the query on all tagged connections in parallel */
static void
-remote_execute(ProxyFunction *func,
- const char **values, int *plengths, int *pformats)
+remote_execute(ProxyFunction *func)
{
ExecStatusType err;
ProxyConnection *conn;
ProxyCluster *cluster = func->cur_cluster;
int i,
- pending;
+ pending = 0;
struct timeval now;
/* either launch connection or send query */
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
/* check if conn is alive, and launch if not */
prepare_conn(func, conn);
+ pending++;
/* if conn is ready, then send query away */
if (conn->state == C_READY)
- send_query(func, conn, values, plengths, pformats);
+ send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
}
/* now loop until all results are arrived */
- pending = 1;
while (pending)
{
/* allow postgres to cancel processing */
@@ -587,12 +586,12 @@ remote_execute(ProxyFunction *func,
for (i = 0; i < cluster->conn_count; i++)
{
conn = &cluster->conn_list[i];
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
/* login finished, send query */
if (conn->state == C_READY)
- send_query(func, conn, values, plengths, pformats);
+ send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats);
if (conn->state != C_DONE)
pending++;
@@ -606,11 +605,11 @@ remote_execute(ProxyFunction *func,
{
conn = &cluster->conn_list[i];
- if ((conn->run_on || conn->res)
- && !(conn->run_on && conn->res))
- plproxy_error(func, "run_on does not match res");
+ if ((conn->run_tag || conn->res)
+ && !(conn->run_tag && conn->res))
+ plproxy_error(func, "run_tag does not match res");
- if (!conn->run_on)
+ if (!conn->run_tag)
continue;
if (conn->state != C_DONE)
@@ -661,9 +660,14 @@ remote_cancel(ProxyFunction *func)
}
}
-/* Run hash function and tag connections */
+/*
+ * Run hash function and tag connections. If any of the hash function
+ * arguments are mentioned in the split_arrays an element of the array
+ * is used instead of the actual array.
+ */
static void
-tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
+tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
+ DatumArray **array_params, int array_row)
{
int i;
TupleDesc desc;
@@ -671,7 +675,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
ProxyCluster *cluster = func->cur_cluster;
/* execute cached plan */
- plproxy_query_exec(func, fcinfo, func->hash_sql);
+ plproxy_query_exec(func, fcinfo, func->hash_sql, array_params, array_row);
/* get header */
desc = SPI_tuptable->tupdesc;
@@ -698,7 +702,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
plproxy_error(func, "Hash result must be int2, int4 or int8");
hashval &= cluster->part_mask;
- cluster->part_map[hashval]->run_on = 1;
+ cluster->part_map[hashval]->run_tag = tag;
}
/* sanity check */
@@ -708,109 +712,305 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
" allows hashcount <> 1");
}
-/* Clean old results and prepare for new one */
-void
-plproxy_clean_results(ProxyCluster *cluster)
+/*
+ * Deconstruct an array type to array of Datums, note NULL elements
+ * and determine the element type information.
+ */
+static DatumArray *
+make_datum_array(ProxyFunction *func, ArrayType *v, Oid elem_type)
{
- int i;
- ProxyConnection *conn;
+ DatumArray *da = palloc0(sizeof(*da));
- if (!cluster)
- return;
-
- cluster->ret_total = 0;
- cluster->ret_cur_conn = 0;
+ da->type = plproxy_find_type_info(func, elem_type, true);
- for (i = 0; i < cluster->conn_count; i++)
- {
- conn = &cluster->conn_list[i];
- if (conn->res)
- {
- PQclear(conn->res);
- conn->res = NULL;
- }
- conn->pos = 0;
- conn->run_on = 0;
- }
- /* conn state checks are done in prepare_conn */
+ if (v)
+ deconstruct_array(v,
+ da->type->type_oid, da->type->length, da->type->by_value,
+ da->type->alignment,
+ &da->values, &da->nulls, &da->elem_count);
+ return da;
}
-/* Select partitions and execute query on them */
-void
-plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
+/*
+ * Evaluate the run condition. Tag the matching connections with the specified
+ * tag.
+ *
+ * Note that we don't allow nested plproxy calls on the same cluster (ie.
+ * remote hash functions). The cluster and connection state are global and
+ * would easily get messed up.
+ */
+static void
+tag_run_on_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag,
+ DatumArray **array_params, int array_row)
{
- const char *values[FUNC_MAX_ARGS];
- int plengths[FUNC_MAX_ARGS];
- int pformats[FUNC_MAX_ARGS];
- int i;
- int gotbin;
- ProxyCluster *cluster = func->cur_cluster;
-
- /* clean old results */
- plproxy_clean_results(cluster);
+ ProxyCluster *cluster = func->cur_cluster;
+ int i;
- /* tag interesting partitions */
switch (func->run_type)
{
case R_HASH:
- tag_hash_partitions(func, fcinfo);
+ tag_hash_partitions(func, fcinfo, tag, array_params, array_row);
break;
case R_ALL:
for (i = 0; i < cluster->part_count; i++)
- cluster->part_map[i]->run_on = 1;
+ cluster->part_map[i]->run_tag = tag;
break;
case R_EXACT:
i = func->exact_nr;
if (i < 0 || i >= cluster->part_count)
plproxy_error(func, "part number out of range");
- cluster->part_map[i]->run_on = 1;
+ cluster->part_map[i]->run_tag = tag;
break;
case R_ANY:
i = random() & cluster->part_mask;
- cluster->part_map[i]->run_on = 1;
+ cluster->part_map[i]->run_tag = tag;
break;
default:
plproxy_error(func, "uninitialized run_type");
}
+}
- /* prepare args */
- gotbin = 0;
- for (i = 0; i < func->remote_sql->arg_count; i++)
+/*
+ * Tag the partitions to be run on, if split is requested prepare the
+ * per-partition split array parameters.
+ *
+ * This is done by looping over all of the split arrays side-by-side, for each
+ * tuple see if it satisfies the RUN ON condition. If so, copy the tuple
+ * to the partition's private array parameters.
+ */
+static void
+prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+ int i, row, col;
+ int split_array_len = -1;
+ int split_array_count = 0;
+ ProxyCluster *cluster = func->cur_cluster;
+ DatumArray *arrays_to_split[FUNC_MAX_ARGS];
+
+ /*
+ * See if we have any arrays to split. If so, make them manageable by
+ * converting them to Datum arrays. During the process verify that all
+ * the arrays are of the same length.
+ */
+ for (i = 0; i < func->arg_count; i++)
{
- int idx = func->remote_sql->arg_lookup[i];
- plengths[i] = 0;
- pformats[i] = 0;
- if (PG_ARGISNULL(idx))
+ ArrayType *v;
+
+ if (!IS_SPLIT_ARG(func, i))
{
- values[i] = NULL;
+ arrays_to_split[i] = NULL;
+ continue;
}
+
+ if (PG_ARGISNULL(i))
+ v = NULL;
else
{
- bool bin = cluster->config.disable_binary ? 0 : 1;
+ v = PG_GETARG_ARRAYTYPE_P(i);
- values[i] = plproxy_send_type(func->arg_types[idx],
- PG_GETARG_DATUM(idx),
- bin,
- &plengths[i],
- &pformats[i]);
+ if (ARR_NDIM(v) > 1)
+ plproxy_error(func, "split multi-dimensional arrays are not supported");
+ }
+
+ arrays_to_split[i] = make_datum_array(func, v, func->arg_types[i]->elem_type);
+
+ /* Check that the element counts match */
+ if (split_array_len < 0)
+ split_array_len = arrays_to_split[i]->elem_count;
+ else if (arrays_to_split[i]->elem_count != split_array_len)
+ plproxy_error(func, "split arrays must be of identical lengths");
+
+ ++split_array_count;
+ }
- if (pformats[i])
- gotbin = 1;
+ /* If nothing to split, just tag the partitions and be done with it */
+ if (!split_array_count)
+ {
+ tag_run_on_partitions(func, fcinfo, 1, NULL, 0);
+ return;
+ }
+
+ /* Need to split, evaluate the RUN ON condition for each of the elements. */
+ for (row = 0; row < split_array_len; row++)
+ {
+ int part;
+ int my_tag = row+1;
+
+ /*
+ * Tag the run-on partitions with a tag that allows us us to identify
+ * which partitions need the set of elements from this row.
+ */
+ tag_run_on_partitions(func, fcinfo, my_tag, arrays_to_split, row);
+
+ /* Add the array elements to the partitions tagged in previous step */
+ for (part = 0; part < cluster->conn_count; part++)
+ {
+ ProxyConnection *conn = &cluster->conn_list[part];
+
+ if (conn->run_tag != my_tag)
+ continue;
+
+ if (!conn->bstate)
+ conn->bstate = palloc0(func->arg_count * sizeof(*conn->bstate));
+
+ /* Add this set of elements to the partition specific arrays */
+ for (col = 0; col < func->arg_count; col++)
+ {
+ if (!IS_SPLIT_ARG(func, col))
+ continue;
+
+ conn->bstate[col] = accumArrayResult(conn->bstate[col],
+ arrays_to_split[col]->values[row],
+ arrays_to_split[col]->nulls[row],
+ arrays_to_split[col]->type->type_oid,
+ CurrentMemoryContext);
+ }
}
}
/*
- * Run query. On cancel, send cancel request to partitions too.
+ * Finally, copy the accumulated arrays to the actual connections
+ * to be used as parameters.
+ */
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ ProxyConnection *conn = &cluster->conn_list[i];
+
+ if (!conn->run_tag)
+ continue;
+
+ conn->split_params = palloc(func->arg_count * sizeof(*conn->split_params));
+
+ for (col = 0; col < func->arg_count; col++)
+ {
+ if (!IS_SPLIT_ARG(func, col))
+ conn->split_params[col] = PointerGetDatum(NULL);
+ else
+ conn->split_params[col] = makeArrayResult(conn->bstate[col],
+ CurrentMemoryContext);
+ }
+ }
+}
+
+/*
+ * Prepare parameters for the query.
+ */
+static void
+prepare_query_parameters(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+ int i;
+ ProxyCluster *cluster = func->cur_cluster;
+
+ for (i = 0; i < func->remote_sql->arg_count; i++)
+ {
+ int idx = func->remote_sql->arg_lookup[i];
+ bool bin = cluster->config.disable_binary ? 0 : 1;
+ const char *fixed_param_val = NULL;
+ int fixed_param_len, fixed_param_fmt;
+ int part;
+
+ /* Avoid doing multiple conversions for fixed parameters */
+ if (!IS_SPLIT_ARG(func, idx) && !PG_ARGISNULL(idx))
+ {
+ fixed_param_val = plproxy_send_type(func->arg_types[idx],
+ PG_GETARG_DATUM(idx),
+ bin,
+ &fixed_param_len,
+ &fixed_param_fmt);
+ }
+
+ /* Add the parameters to partitions */
+ for (part = 0; part < cluster->conn_count; part++)
+ {
+ ProxyConnection *conn = &cluster->conn_list[part];
+
+ if (!conn->run_tag)
+ continue;
+
+ if (PG_ARGISNULL(idx))
+ {
+ conn->param_values[i] = NULL;
+ conn->param_lengths[i] = 0;
+ conn->param_formats[i] = 0;
+ }
+ else
+ {
+ if (IS_SPLIT_ARG(func, idx))
+ {
+ conn->param_values[i] = plproxy_send_type(func->arg_types[idx],
+ conn->split_params[idx],
+ bin,
+ &conn->param_lengths[i],
+ &conn->param_formats[i]);
+ }
+ else
+ {
+ conn->param_values[i] = fixed_param_val;
+ conn->param_lengths[i] = fixed_param_len;
+ conn->param_formats[i] = fixed_param_fmt;
+ }
+ }
+ }
+ }
+}
+
+/* Clean old results and prepare for new one */
+void
+plproxy_clean_results(ProxyCluster *cluster)
+{
+ int i;
+ ProxyConnection *conn;
+
+ if (!cluster)
+ return;
+
+ cluster->ret_total = 0;
+ cluster->ret_cur_conn = 0;
+
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (conn->res)
+ {
+ PQclear(conn->res);
+ conn->res = NULL;
+ }
+ conn->pos = 0;
+ conn->run_tag = 0;
+ conn->bstate = NULL;
+ }
+ /* conn state checks are done in prepare_conn */
+}
+
+/* Select partitions and execute query on them */
+void
+plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
+{
+ /*
+ * Prepare parameters and run query. On cancel, send cancel request to
+ * partitions too.
*/
PG_TRY();
{
- if (gotbin)
- remote_execute(func, values, plengths, pformats);
- else
- remote_execute(func, values, NULL, NULL);
+ func->cur_cluster->busy = true;
+
+ /* clean old results */
+ plproxy_clean_results(func->cur_cluster);
+
+ /* tag the partitions and prepare per-partition parameters */
+ prepare_and_tag_partitions(func, fcinfo);
+
+ /* prepare the target query parameters */
+ prepare_query_parameters(func, fcinfo);
+
+ remote_execute(func);
+
+ func->cur_cluster->busy = false;
}
PG_CATCH();
{
+ func->cur_cluster->busy = false;
+
if (geterrcode() == ERRCODE_QUERY_CANCELED)
remote_cancel(func);
PG_RE_THROW();
diff --git a/src/function.c b/src/function.c
index a948925..42a97e2 100644
--- a/src/function.c
+++ b/src/function.c
@@ -73,6 +73,63 @@ plproxy_func_strdup(ProxyFunction *func, const char *s)
return res;
}
+/* Find the index of a named parameter, -1 if not found */
+int
+plproxy_get_parameter_index(ProxyFunction *func, const char *ident)
+{
+ int i;
+
+ if (ident[0] == '$')
+ {
+ /* Probably a $# parameter reference */
+ i = atoi(ident + 1) - 1;
+ if (i >= 0 && i < func->arg_count)
+ return i;
+ }
+ else if (func->arg_names)
+ {
+ /* Named parameter, go through the argument names */
+ for (i = 0; i < func->arg_count; i++)
+ {
+ if (!func->arg_names[i])
+ continue;
+ if (pg_strcasecmp(ident, func->arg_names[i]) == 0)
+ return i;
+ }
+ }
+
+ return -1;
+}
+
+/* Add a new split argument */
+bool
+plproxy_split_add_ident(ProxyFunction *func, const char *ident)
+{
+ int argindex;
+
+ if ((argindex = plproxy_get_parameter_index(func, ident)) < 0)
+ return false;
+
+ /* Already split? */
+ if (IS_SPLIT_ARG(func, argindex))
+ plproxy_error(func, "SPLIT parameter specified more than once: %s", ident);
+
+ /* Is it an array? */
+ if (!func->arg_types[argindex]->is_array)
+ plproxy_error(func, "SPLIT parameter is not an array: %s", ident);
+
+ if (!func->split_args)
+ {
+ size_t alloc_size = sizeof(*func->split_args) * func->arg_count;
+
+ func->split_args = plproxy_func_alloc(func, alloc_size);
+ MemSet(func->split_args, 0, alloc_size);
+ }
+
+ func->split_args[argindex] = true;
+
+ return true;
+}
/* Initialize PL/Proxy function cache */
void
@@ -413,11 +470,11 @@ fn_compile(FunctionCallInfo fcinfo,
/* prepare local queries */
if (f->cluster_sql)
- plproxy_query_prepare(f, fcinfo, f->cluster_sql);
+ plproxy_query_prepare(f, fcinfo, f->cluster_sql, false);
if (f->hash_sql)
- plproxy_query_prepare(f, fcinfo, f->hash_sql);
+ plproxy_query_prepare(f, fcinfo, f->hash_sql, true);
if (f->connect_sql)
- plproxy_query_prepare(f, fcinfo, f->connect_sql);
+ plproxy_query_prepare(f, fcinfo, f->connect_sql, false);
/* sanity check */
if (f->run_type == R_ALL && !fcinfo->flinfo->fn_retset)
diff --git a/src/main.c b/src/main.c
index d0a3672..36441d9 100644
--- a/src/main.c
+++ b/src/main.c
@@ -140,6 +140,10 @@ compile_and_execute(FunctionCallInfo fcinfo)
/* get actual cluster to run on */
cluster = plproxy_find_cluster(func, fcinfo);
+ /* Don't allow nested calls on the same cluster */
+ if (cluster->busy)
+ plproxy_error(func, "Nested PL/Proxy calls to the same cluster are not supported.");
+
/* fetch PGresults */
func->cur_cluster = cluster;
plproxy_exec(func, fcinfo);
diff --git a/src/parser.y b/src/parser.y
index f6c331a..987ef7a 100644
--- a/src/parser.y
+++ b/src/parser.y
@@ -35,7 +35,7 @@ void plproxy_yy_scan_bytes(const char *bytes, int len);
static ProxyFunction *xfunc;
/* remember what happened */
-static int got_run, got_cluster, got_connect;
+static int got_run, got_cluster, got_connect, got_split;
static QueryBuffer *cluster_sql;
static QueryBuffer *select_sql;
@@ -48,7 +48,7 @@ static QueryBuffer *cur_sql;
/* keep the resetting code together with variables */
static void reset_parser_vars(void)
{
- got_run = got_cluster = got_connect = 0;
+ got_run = got_cluster = got_connect = got_split = 0;
cur_sql = select_sql = cluster_sql = hash_sql = connect_sql = NULL;
xfunc = NULL;
}
@@ -58,7 +58,7 @@ static void reset_parser_vars(void)
%name-prefix="plproxy_yy"
%token <str> CONNECT CLUSTER RUN ON ALL ANY SELECT
-%token <str> IDENT NUMBER FNCALL STRING
+%token <str> IDENT NUMBER FNCALL SPLIT STRING
%token <str> SQLIDENT SQLPART
%union
@@ -70,7 +70,7 @@ static void reset_parser_vars(void)
body: | body stmt ;
-stmt: cluster_stmt | run_stmt | select_stmt | connect_stmt ;
+stmt: cluster_stmt | split_stmt | run_stmt | select_stmt | connect_stmt ;
connect_stmt: CONNECT connect_spec ';' {
if (got_connect)
@@ -117,6 +117,22 @@ cluster_func: FNCALL { cluster_sql = plproxy_query_start(xfunc, false);
cluster_name: STRING { xfunc->cluster_name = plproxy_func_strdup(xfunc, $1); }
;
+split_stmt: SPLIT split_param_list ';' {
+ if (got_split)
+ yyerror("Only one SPLIT statement allowed");
+ got_split = 1;
+ }
+ ;
+
+split_param_list: split_param
+ | split_param_list ',' split_param
+ ;
+
+split_param: IDENT {
+ if (!plproxy_split_add_ident(xfunc, $1))
+ yyerror("invalid argument reference: %s", $1);
+ }
+
run_stmt: RUN ON run_spec ';' { if (got_run)
yyerror("Only one RUN statement allowed");
got_run = 1; }
diff --git a/src/plproxy.h b/src/plproxy.h
index 37d97fd..8b8d20e 100644
--- a/src/plproxy.h
+++ b/src/plproxy.h
@@ -36,6 +36,7 @@
#include <commands/trigger.h>
#include <mb/pg_wchar.h>
#include <miscadmin.h>
+#include <utils/array.h>
#include <utils/builtins.h>
#include <utils/hsearch.h>
#include <utils/lsyscache.h>
@@ -65,6 +66,11 @@
/*
+ * Determine if this argument is to SPLIT
+ */
+#define IS_SPLIT_ARG(func, arg) ((func)->split_args && (func)->split_args[arg])
+
+/*
* Maintenece period in seconds. Connnections will be freed
* from stale results, and checked for lifetime.
*/
@@ -118,9 +124,25 @@ typedef struct
ConnState state; /* Connection state */
time_t connect_time; /* When connection was started */
time_t query_time; /* When last query was sent */
- bool run_on; /* True it this connection should be used */
bool same_ver; /* True if dest backend has same X.Y ver */
bool tuning; /* True if tuning query is running on conn */
+
+ /*
+ * Nonzero if this connection should be used. The actual tag value is only
+ * used by SPLIT processing, others should treat it as a boolean value.
+ */
+ int run_tag;
+
+ /*
+ * Per-connection parameters. These are a assigned just before the
+ * remote call is made.
+ */
+
+ Datum *split_params; /* Split array parameters */
+ ArrayBuildState **bstate; /* Temporary build state */
+ const char *param_values[FUNC_MAX_ARGS]; /* Parameter values */
+ int param_lengths[FUNC_MAX_ARGS]; /* Parameter lengths (binary io) */
+ int param_formats[FUNC_MAX_ARGS]; /* Parameter formats (binary io) */
} ProxyConnection;
/* Info about one cluster */
@@ -142,6 +164,8 @@ typedef struct ProxyCluster
int ret_cur_conn; /* Result walking: index of current conn */
int ret_cur_pos; /* Result walking: index of current row */
int ret_total; /* Result walking: total rows left */
+
+ bool busy; /* True if the cluster is already involved in execution */
} ProxyCluster;
/*
@@ -161,6 +185,10 @@ typedef struct ProxyType
bool has_send; /* Has binary output */
bool has_recv; /* Has binary input */
bool by_value; /* False if Datum is a pointer to data */
+ char alignment; /* Type alignment */
+ bool is_array; /* True if array */
+ Oid elem_type; /* Array element type */
+ short length; /* Type length */
/* I/O functions */
union
@@ -208,6 +236,17 @@ typedef struct ProxyQuery
} ProxyQuery;
/*
+ * Deconstructed array parameters
+ */
+typedef struct DatumArray
+{
+ ProxyType *type;
+ Datum *values;
+ bool *nulls;
+ int elem_count;
+} DatumArray;
+
+/*
* Complete info about compiled function.
*
* Note: only IN and INOUT arguments are cached here.
@@ -224,6 +263,8 @@ typedef struct ProxyFunction
char **arg_names; /* Argument names, may contain NULLs */
short arg_count; /* Argument count of proxy function */
+ bool *split_args; /* Map of arguments to split */
+
/* if the function returns untyped RECORD that needs AS clause */
bool dynamic_record;
@@ -272,6 +313,8 @@ void plproxy_error(ProxyFunction *func, const char *fmt,...);
void plproxy_function_cache_init(void);
void *plproxy_func_alloc(ProxyFunction *func, int size);
char *plproxy_func_strdup(ProxyFunction *func, const char *s);
+int plproxy_get_parameter_index(ProxyFunction *func, const char *ident);
+bool plproxy_split_add_ident(ProxyFunction *func, const char *ident);
ProxyFunction *plproxy_compile(FunctionCallInfo fcinfo, bool validate);
/* execute.c */
@@ -312,8 +355,9 @@ bool plproxy_query_add_const(QueryBuffer *q, const char *data);
bool plproxy_query_add_ident(QueryBuffer *q, const char *ident);
ProxyQuery *plproxy_query_finish(QueryBuffer *q);
ProxyQuery *plproxy_standard_query(ProxyFunction *func, bool add_types);
-void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q);
-void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q);
+void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support);
+void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q,
+ DatumArray **array_params, int array_row);
void plproxy_query_freeplan(ProxyQuery *q);
#endif
diff --git a/src/query.c b/src/query.c
index c734bf0..d0e0577 100644
--- a/src/query.c
+++ b/src/query.c
@@ -90,25 +90,8 @@ plproxy_query_add_ident(QueryBuffer *q, const char *ident)
fn_idx = -1,
sql_idx = -1;
- if (ident[0] == '$')
- {
- fn_idx = atoi(ident + 1) - 1;
- if (fn_idx < 0 || fn_idx >= q->func->arg_count)
- return false;
- }
- else if (q->func->arg_names)
- {
- for (i = 0; i < q->func->arg_count; i++)
- {
- if (!q->func->arg_names[i])
- continue;
- if (pg_strcasecmp(ident, q->func->arg_names[i]) == 0)
- {
- fn_idx = i;
- break;
- }
- }
- }
+ fn_idx = plproxy_get_parameter_index(q->func, ident);
+
if (fn_idx >= 0)
{
for (i = 0; i < q->arg_count; i++)
@@ -127,7 +110,12 @@ plproxy_query_add_ident(QueryBuffer *q, const char *ident)
add_ref(q->sql, sql_idx, q->func, fn_idx, q->add_types);
}
else
+ {
+ if (ident[0] == '$')
+ return false;
appendStringInfoString(q->sql, ident);
+ }
+
return true;
}
@@ -149,6 +137,7 @@ plproxy_query_finish(QueryBuffer *q)
len = q->arg_count * sizeof(int);
pq->arg_lookup = palloc(len);
pq->plan = NULL;
+
memcpy(pq->arg_lookup, q->arg_lookup, len);
MemoryContextSwitchTo(old);
@@ -247,7 +236,7 @@ plproxy_standard_query(ProxyFunction *func, bool add_types)
* Prepare ProxyQuery for local execution
*/
void
-plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
+plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support)
{
int i;
Oid types[FUNC_MAX_ARGS];
@@ -258,7 +247,11 @@ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *
{
int idx = q->arg_lookup[i];
- types[i] = func->arg_types[idx]->type_oid;
+ if (split_support && IS_SPLIT_ARG(func, idx))
+ /* for SPLIT arguments use array element type instead */
+ types[i] = func->arg_types[idx]->elem_type;
+ else
+ types[i] = func->arg_types[idx]->type_oid;
}
/* prepare & store plan */
@@ -272,12 +265,12 @@ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *
* Result will be in SPI_tuptable.
*/
void
-plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
+plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q,
+ DatumArray **array_params, int array_row)
{
int i,
idx,
err;
- ProxyType *type;
char arg_nulls[FUNC_MAX_ARGS];
Datum arg_values[FUNC_MAX_ARGS];
@@ -285,12 +278,19 @@ plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
for (i = 0; i < q->arg_count; i++)
{
idx = q->arg_lookup[i];
- type = func->arg_types[idx];
+
if (PG_ARGISNULL(idx))
{
arg_nulls[i] = 'n';
arg_values[i] = (Datum) NULL;
}
+ else if (array_params && IS_SPLIT_ARG(func, idx))
+ {
+ DatumArray *ats = array_params[idx];
+
+ arg_nulls[i] = ats->nulls[array_row] ? 'n' : ' ';
+ arg_values[i] = ats->nulls[array_row] ? (Datum) NULL : ats->values[array_row];
+ }
else
{
arg_nulls[i] = ' ';
diff --git a/src/scanner.l b/src/scanner.l
index cb203b7..36166c9 100644
--- a/src/scanner.l
+++ b/src/scanner.l
@@ -179,6 +179,7 @@ run { return RUN; }
on { return ON; }
all { return ALL; }
any { return ANY; }
+split { return SPLIT; }
select { BEGIN(sql); yylval.str = yytext; return SELECT; }
/* function call */
diff --git a/src/type.c b/src/type.c
index dd7165d..3679d83 100644
--- a/src/type.c
+++ b/src/type.c
@@ -253,6 +253,10 @@ plproxy_find_type_info(ProxyFunction *func, Oid oid, bool for_send)
type->for_send = for_send;
type->by_value = s_type->typbyval;
type->name = plproxy_func_strdup(func, namebuf);
+ type->is_array = (s_type->typelem != 0 && s_type->typlen == -1);
+ type->elem_type = s_type->typelem;
+ type->alignment = s_type->typalign;
+ type->length = s_type->typlen;
/* decide what function is needed */
if (for_send)