diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/cluster.c | 2 | ||||
| -rw-r--r-- | src/execute.c | 356 | ||||
| -rw-r--r-- | src/function.c | 63 | ||||
| -rw-r--r-- | src/main.c | 4 | ||||
| -rw-r--r-- | src/parser.y | 24 | ||||
| -rw-r--r-- | src/plproxy.h | 50 | ||||
| -rw-r--r-- | src/query.c | 48 | ||||
| -rw-r--r-- | src/scanner.l | 1 | ||||
| -rw-r--r-- | src/type.c | 4 |
9 files changed, 439 insertions, 113 deletions
diff --git a/src/cluster.c b/src/cluster.c index f314ee5..3feb063 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -406,7 +406,7 @@ resolve_query(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *query) HeapTuple row; TupleDesc desc; - plproxy_query_exec(func, fcinfo, query); + plproxy_query_exec(func, fcinfo, query, NULL, 0); if (SPI_processed != 1) plproxy_error(func, "'%s' returned %d rows, expected 1", diff --git a/src/execute.c b/src/execute.c index f28114a..b8517e9 100644 --- a/src/execute.c +++ b/src/execute.c @@ -439,7 +439,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (!conn->run_on) + if (!conn->run_tag) continue; /* decide what to do */ @@ -482,7 +482,7 @@ poll_conns(ProxyFunction *func, ProxyCluster *cluster) for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (!conn->run_on) + if (!conn->run_tag) continue; switch (conn->state) @@ -545,33 +545,32 @@ check_timeouts(ProxyFunction *func, ProxyCluster *cluster, ProxyConnection *conn /* Run the query on all tagged connections in parallel */ static void -remote_execute(ProxyFunction *func, - const char **values, int *plengths, int *pformats) +remote_execute(ProxyFunction *func) { ExecStatusType err; ProxyConnection *conn; ProxyCluster *cluster = func->cur_cluster; int i, - pending; + pending = 0; struct timeval now; /* either launch connection or send query */ for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (!conn->run_on) + if (!conn->run_tag) continue; /* check if conn is alive, and launch if not */ prepare_conn(func, conn); + pending++; /* if conn is ready, then send query away */ if (conn->state == C_READY) - send_query(func, conn, values, plengths, pformats); + send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats); } /* now loop until all results are arrived */ - pending = 1; while (pending) { /* allow postgres to cancel processing */ @@ -587,12 +586,12 @@ remote_execute(ProxyFunction *func, for (i = 0; i < cluster->conn_count; i++) { conn = &cluster->conn_list[i]; - if (!conn->run_on) + if (!conn->run_tag) continue; /* login finished, send query */ if (conn->state == C_READY) - send_query(func, conn, values, plengths, pformats); + send_query(func, conn, conn->param_values, conn->param_lengths, conn->param_formats); if (conn->state != C_DONE) pending++; @@ -606,11 +605,11 @@ remote_execute(ProxyFunction *func, { conn = &cluster->conn_list[i]; - if ((conn->run_on || conn->res) - && !(conn->run_on && conn->res)) - plproxy_error(func, "run_on does not match res"); + if ((conn->run_tag || conn->res) + && !(conn->run_tag && conn->res)) + plproxy_error(func, "run_tag does not match res"); - if (!conn->run_on) + if (!conn->run_tag) continue; if (conn->state != C_DONE) @@ -661,9 +660,14 @@ remote_cancel(ProxyFunction *func) } } -/* Run hash function and tag connections */ +/* + * Run hash function and tag connections. If any of the hash function + * arguments are mentioned in the split_arrays an element of the array + * is used instead of the actual array. + */ static void -tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) +tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag, + DatumArray **array_params, int array_row) { int i; TupleDesc desc; @@ -671,7 +675,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) ProxyCluster *cluster = func->cur_cluster; /* execute cached plan */ - plproxy_query_exec(func, fcinfo, func->hash_sql); + plproxy_query_exec(func, fcinfo, func->hash_sql, array_params, array_row); /* get header */ desc = SPI_tuptable->tupdesc; @@ -698,7 +702,7 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) plproxy_error(func, "Hash result must be int2, int4 or int8"); hashval &= cluster->part_mask; - cluster->part_map[hashval]->run_on = 1; + cluster->part_map[hashval]->run_tag = tag; } /* sanity check */ @@ -708,109 +712,305 @@ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) " allows hashcount <> 1"); } -/* Clean old results and prepare for new one */ -void -plproxy_clean_results(ProxyCluster *cluster) +/* + * Deconstruct an array type to array of Datums, note NULL elements + * and determine the element type information. + */ +static DatumArray * +make_datum_array(ProxyFunction *func, ArrayType *v, Oid elem_type) { - int i; - ProxyConnection *conn; + DatumArray *da = palloc0(sizeof(*da)); - if (!cluster) - return; - - cluster->ret_total = 0; - cluster->ret_cur_conn = 0; + da->type = plproxy_find_type_info(func, elem_type, true); - for (i = 0; i < cluster->conn_count; i++) - { - conn = &cluster->conn_list[i]; - if (conn->res) - { - PQclear(conn->res); - conn->res = NULL; - } - conn->pos = 0; - conn->run_on = 0; - } - /* conn state checks are done in prepare_conn */ + if (v) + deconstruct_array(v, + da->type->type_oid, da->type->length, da->type->by_value, + da->type->alignment, + &da->values, &da->nulls, &da->elem_count); + return da; } -/* Select partitions and execute query on them */ -void -plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo) +/* + * Evaluate the run condition. Tag the matching connections with the specified + * tag. + * + * Note that we don't allow nested plproxy calls on the same cluster (ie. + * remote hash functions). The cluster and connection state are global and + * would easily get messed up. + */ +static void +tag_run_on_partitions(ProxyFunction *func, FunctionCallInfo fcinfo, int tag, + DatumArray **array_params, int array_row) { - const char *values[FUNC_MAX_ARGS]; - int plengths[FUNC_MAX_ARGS]; - int pformats[FUNC_MAX_ARGS]; - int i; - int gotbin; - ProxyCluster *cluster = func->cur_cluster; - - /* clean old results */ - plproxy_clean_results(cluster); + ProxyCluster *cluster = func->cur_cluster; + int i; - /* tag interesting partitions */ switch (func->run_type) { case R_HASH: - tag_hash_partitions(func, fcinfo); + tag_hash_partitions(func, fcinfo, tag, array_params, array_row); break; case R_ALL: for (i = 0; i < cluster->part_count; i++) - cluster->part_map[i]->run_on = 1; + cluster->part_map[i]->run_tag = tag; break; case R_EXACT: i = func->exact_nr; if (i < 0 || i >= cluster->part_count) plproxy_error(func, "part number out of range"); - cluster->part_map[i]->run_on = 1; + cluster->part_map[i]->run_tag = tag; break; case R_ANY: i = random() & cluster->part_mask; - cluster->part_map[i]->run_on = 1; + cluster->part_map[i]->run_tag = tag; break; default: plproxy_error(func, "uninitialized run_type"); } +} - /* prepare args */ - gotbin = 0; - for (i = 0; i < func->remote_sql->arg_count; i++) +/* + * Tag the partitions to be run on, if split is requested prepare the + * per-partition split array parameters. + * + * This is done by looping over all of the split arrays side-by-side, for each + * tuple see if it satisfies the RUN ON condition. If so, copy the tuple + * to the partition's private array parameters. + */ +static void +prepare_and_tag_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) +{ + int i, row, col; + int split_array_len = -1; + int split_array_count = 0; + ProxyCluster *cluster = func->cur_cluster; + DatumArray *arrays_to_split[FUNC_MAX_ARGS]; + + /* + * See if we have any arrays to split. If so, make them manageable by + * converting them to Datum arrays. During the process verify that all + * the arrays are of the same length. + */ + for (i = 0; i < func->arg_count; i++) { - int idx = func->remote_sql->arg_lookup[i]; - plengths[i] = 0; - pformats[i] = 0; - if (PG_ARGISNULL(idx)) + ArrayType *v; + + if (!IS_SPLIT_ARG(func, i)) { - values[i] = NULL; + arrays_to_split[i] = NULL; + continue; } + + if (PG_ARGISNULL(i)) + v = NULL; else { - bool bin = cluster->config.disable_binary ? 0 : 1; + v = PG_GETARG_ARRAYTYPE_P(i); - values[i] = plproxy_send_type(func->arg_types[idx], - PG_GETARG_DATUM(idx), - bin, - &plengths[i], - &pformats[i]); + if (ARR_NDIM(v) > 1) + plproxy_error(func, "split multi-dimensional arrays are not supported"); + } + + arrays_to_split[i] = make_datum_array(func, v, func->arg_types[i]->elem_type); + + /* Check that the element counts match */ + if (split_array_len < 0) + split_array_len = arrays_to_split[i]->elem_count; + else if (arrays_to_split[i]->elem_count != split_array_len) + plproxy_error(func, "split arrays must be of identical lengths"); + + ++split_array_count; + } - if (pformats[i]) - gotbin = 1; + /* If nothing to split, just tag the partitions and be done with it */ + if (!split_array_count) + { + tag_run_on_partitions(func, fcinfo, 1, NULL, 0); + return; + } + + /* Need to split, evaluate the RUN ON condition for each of the elements. */ + for (row = 0; row < split_array_len; row++) + { + int part; + int my_tag = row+1; + + /* + * Tag the run-on partitions with a tag that allows us us to identify + * which partitions need the set of elements from this row. + */ + tag_run_on_partitions(func, fcinfo, my_tag, arrays_to_split, row); + + /* Add the array elements to the partitions tagged in previous step */ + for (part = 0; part < cluster->conn_count; part++) + { + ProxyConnection *conn = &cluster->conn_list[part]; + + if (conn->run_tag != my_tag) + continue; + + if (!conn->bstate) + conn->bstate = palloc0(func->arg_count * sizeof(*conn->bstate)); + + /* Add this set of elements to the partition specific arrays */ + for (col = 0; col < func->arg_count; col++) + { + if (!IS_SPLIT_ARG(func, col)) + continue; + + conn->bstate[col] = accumArrayResult(conn->bstate[col], + arrays_to_split[col]->values[row], + arrays_to_split[col]->nulls[row], + arrays_to_split[col]->type->type_oid, + CurrentMemoryContext); + } } } /* - * Run query. On cancel, send cancel request to partitions too. + * Finally, copy the accumulated arrays to the actual connections + * to be used as parameters. + */ + for (i = 0; i < cluster->conn_count; i++) + { + ProxyConnection *conn = &cluster->conn_list[i]; + + if (!conn->run_tag) + continue; + + conn->split_params = palloc(func->arg_count * sizeof(*conn->split_params)); + + for (col = 0; col < func->arg_count; col++) + { + if (!IS_SPLIT_ARG(func, col)) + conn->split_params[col] = PointerGetDatum(NULL); + else + conn->split_params[col] = makeArrayResult(conn->bstate[col], + CurrentMemoryContext); + } + } +} + +/* + * Prepare parameters for the query. + */ +static void +prepare_query_parameters(ProxyFunction *func, FunctionCallInfo fcinfo) +{ + int i; + ProxyCluster *cluster = func->cur_cluster; + + for (i = 0; i < func->remote_sql->arg_count; i++) + { + int idx = func->remote_sql->arg_lookup[i]; + bool bin = cluster->config.disable_binary ? 0 : 1; + const char *fixed_param_val = NULL; + int fixed_param_len, fixed_param_fmt; + int part; + + /* Avoid doing multiple conversions for fixed parameters */ + if (!IS_SPLIT_ARG(func, idx) && !PG_ARGISNULL(idx)) + { + fixed_param_val = plproxy_send_type(func->arg_types[idx], + PG_GETARG_DATUM(idx), + bin, + &fixed_param_len, + &fixed_param_fmt); + } + + /* Add the parameters to partitions */ + for (part = 0; part < cluster->conn_count; part++) + { + ProxyConnection *conn = &cluster->conn_list[part]; + + if (!conn->run_tag) + continue; + + if (PG_ARGISNULL(idx)) + { + conn->param_values[i] = NULL; + conn->param_lengths[i] = 0; + conn->param_formats[i] = 0; + } + else + { + if (IS_SPLIT_ARG(func, idx)) + { + conn->param_values[i] = plproxy_send_type(func->arg_types[idx], + conn->split_params[idx], + bin, + &conn->param_lengths[i], + &conn->param_formats[i]); + } + else + { + conn->param_values[i] = fixed_param_val; + conn->param_lengths[i] = fixed_param_len; + conn->param_formats[i] = fixed_param_fmt; + } + } + } + } +} + +/* Clean old results and prepare for new one */ +void +plproxy_clean_results(ProxyCluster *cluster) +{ + int i; + ProxyConnection *conn; + + if (!cluster) + return; + + cluster->ret_total = 0; + cluster->ret_cur_conn = 0; + + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (conn->res) + { + PQclear(conn->res); + conn->res = NULL; + } + conn->pos = 0; + conn->run_tag = 0; + conn->bstate = NULL; + } + /* conn state checks are done in prepare_conn */ +} + +/* Select partitions and execute query on them */ +void +plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo) +{ + /* + * Prepare parameters and run query. On cancel, send cancel request to + * partitions too. */ PG_TRY(); { - if (gotbin) - remote_execute(func, values, plengths, pformats); - else - remote_execute(func, values, NULL, NULL); + func->cur_cluster->busy = true; + + /* clean old results */ + plproxy_clean_results(func->cur_cluster); + + /* tag the partitions and prepare per-partition parameters */ + prepare_and_tag_partitions(func, fcinfo); + + /* prepare the target query parameters */ + prepare_query_parameters(func, fcinfo); + + remote_execute(func); + + func->cur_cluster->busy = false; } PG_CATCH(); { + func->cur_cluster->busy = false; + if (geterrcode() == ERRCODE_QUERY_CANCELED) remote_cancel(func); PG_RE_THROW(); diff --git a/src/function.c b/src/function.c index a948925..42a97e2 100644 --- a/src/function.c +++ b/src/function.c @@ -73,6 +73,63 @@ plproxy_func_strdup(ProxyFunction *func, const char *s) return res; } +/* Find the index of a named parameter, -1 if not found */ +int +plproxy_get_parameter_index(ProxyFunction *func, const char *ident) +{ + int i; + + if (ident[0] == '$') + { + /* Probably a $# parameter reference */ + i = atoi(ident + 1) - 1; + if (i >= 0 && i < func->arg_count) + return i; + } + else if (func->arg_names) + { + /* Named parameter, go through the argument names */ + for (i = 0; i < func->arg_count; i++) + { + if (!func->arg_names[i]) + continue; + if (pg_strcasecmp(ident, func->arg_names[i]) == 0) + return i; + } + } + + return -1; +} + +/* Add a new split argument */ +bool +plproxy_split_add_ident(ProxyFunction *func, const char *ident) +{ + int argindex; + + if ((argindex = plproxy_get_parameter_index(func, ident)) < 0) + return false; + + /* Already split? */ + if (IS_SPLIT_ARG(func, argindex)) + plproxy_error(func, "SPLIT parameter specified more than once: %s", ident); + + /* Is it an array? */ + if (!func->arg_types[argindex]->is_array) + plproxy_error(func, "SPLIT parameter is not an array: %s", ident); + + if (!func->split_args) + { + size_t alloc_size = sizeof(*func->split_args) * func->arg_count; + + func->split_args = plproxy_func_alloc(func, alloc_size); + MemSet(func->split_args, 0, alloc_size); + } + + func->split_args[argindex] = true; + + return true; +} /* Initialize PL/Proxy function cache */ void @@ -413,11 +470,11 @@ fn_compile(FunctionCallInfo fcinfo, /* prepare local queries */ if (f->cluster_sql) - plproxy_query_prepare(f, fcinfo, f->cluster_sql); + plproxy_query_prepare(f, fcinfo, f->cluster_sql, false); if (f->hash_sql) - plproxy_query_prepare(f, fcinfo, f->hash_sql); + plproxy_query_prepare(f, fcinfo, f->hash_sql, true); if (f->connect_sql) - plproxy_query_prepare(f, fcinfo, f->connect_sql); + plproxy_query_prepare(f, fcinfo, f->connect_sql, false); /* sanity check */ if (f->run_type == R_ALL && !fcinfo->flinfo->fn_retset) @@ -140,6 +140,10 @@ compile_and_execute(FunctionCallInfo fcinfo) /* get actual cluster to run on */ cluster = plproxy_find_cluster(func, fcinfo); + /* Don't allow nested calls on the same cluster */ + if (cluster->busy) + plproxy_error(func, "Nested PL/Proxy calls to the same cluster are not supported."); + /* fetch PGresults */ func->cur_cluster = cluster; plproxy_exec(func, fcinfo); diff --git a/src/parser.y b/src/parser.y index f6c331a..987ef7a 100644 --- a/src/parser.y +++ b/src/parser.y @@ -35,7 +35,7 @@ void plproxy_yy_scan_bytes(const char *bytes, int len); static ProxyFunction *xfunc; /* remember what happened */ -static int got_run, got_cluster, got_connect; +static int got_run, got_cluster, got_connect, got_split; static QueryBuffer *cluster_sql; static QueryBuffer *select_sql; @@ -48,7 +48,7 @@ static QueryBuffer *cur_sql; /* keep the resetting code together with variables */ static void reset_parser_vars(void) { - got_run = got_cluster = got_connect = 0; + got_run = got_cluster = got_connect = got_split = 0; cur_sql = select_sql = cluster_sql = hash_sql = connect_sql = NULL; xfunc = NULL; } @@ -58,7 +58,7 @@ static void reset_parser_vars(void) %name-prefix="plproxy_yy" %token <str> CONNECT CLUSTER RUN ON ALL ANY SELECT -%token <str> IDENT NUMBER FNCALL STRING +%token <str> IDENT NUMBER FNCALL SPLIT STRING %token <str> SQLIDENT SQLPART %union @@ -70,7 +70,7 @@ static void reset_parser_vars(void) body: | body stmt ; -stmt: cluster_stmt | run_stmt | select_stmt | connect_stmt ; +stmt: cluster_stmt | split_stmt | run_stmt | select_stmt | connect_stmt ; connect_stmt: CONNECT connect_spec ';' { if (got_connect) @@ -117,6 +117,22 @@ cluster_func: FNCALL { cluster_sql = plproxy_query_start(xfunc, false); cluster_name: STRING { xfunc->cluster_name = plproxy_func_strdup(xfunc, $1); } ; +split_stmt: SPLIT split_param_list ';' { + if (got_split) + yyerror("Only one SPLIT statement allowed"); + got_split = 1; + } + ; + +split_param_list: split_param + | split_param_list ',' split_param + ; + +split_param: IDENT { + if (!plproxy_split_add_ident(xfunc, $1)) + yyerror("invalid argument reference: %s", $1); + } + run_stmt: RUN ON run_spec ';' { if (got_run) yyerror("Only one RUN statement allowed"); got_run = 1; } diff --git a/src/plproxy.h b/src/plproxy.h index 37d97fd..8b8d20e 100644 --- a/src/plproxy.h +++ b/src/plproxy.h @@ -36,6 +36,7 @@ #include <commands/trigger.h> #include <mb/pg_wchar.h> #include <miscadmin.h> +#include <utils/array.h> #include <utils/builtins.h> #include <utils/hsearch.h> #include <utils/lsyscache.h> @@ -65,6 +66,11 @@ /* + * Determine if this argument is to SPLIT + */ +#define IS_SPLIT_ARG(func, arg) ((func)->split_args && (func)->split_args[arg]) + +/* * Maintenece period in seconds. Connnections will be freed * from stale results, and checked for lifetime. */ @@ -118,9 +124,25 @@ typedef struct ConnState state; /* Connection state */ time_t connect_time; /* When connection was started */ time_t query_time; /* When last query was sent */ - bool run_on; /* True it this connection should be used */ bool same_ver; /* True if dest backend has same X.Y ver */ bool tuning; /* True if tuning query is running on conn */ + + /* + * Nonzero if this connection should be used. The actual tag value is only + * used by SPLIT processing, others should treat it as a boolean value. + */ + int run_tag; + + /* + * Per-connection parameters. These are a assigned just before the + * remote call is made. + */ + + Datum *split_params; /* Split array parameters */ + ArrayBuildState **bstate; /* Temporary build state */ + const char *param_values[FUNC_MAX_ARGS]; /* Parameter values */ + int param_lengths[FUNC_MAX_ARGS]; /* Parameter lengths (binary io) */ + int param_formats[FUNC_MAX_ARGS]; /* Parameter formats (binary io) */ } ProxyConnection; /* Info about one cluster */ @@ -142,6 +164,8 @@ typedef struct ProxyCluster int ret_cur_conn; /* Result walking: index of current conn */ int ret_cur_pos; /* Result walking: index of current row */ int ret_total; /* Result walking: total rows left */ + + bool busy; /* True if the cluster is already involved in execution */ } ProxyCluster; /* @@ -161,6 +185,10 @@ typedef struct ProxyType bool has_send; /* Has binary output */ bool has_recv; /* Has binary input */ bool by_value; /* False if Datum is a pointer to data */ + char alignment; /* Type alignment */ + bool is_array; /* True if array */ + Oid elem_type; /* Array element type */ + short length; /* Type length */ /* I/O functions */ union @@ -208,6 +236,17 @@ typedef struct ProxyQuery } ProxyQuery; /* + * Deconstructed array parameters + */ +typedef struct DatumArray +{ + ProxyType *type; + Datum *values; + bool *nulls; + int elem_count; +} DatumArray; + +/* * Complete info about compiled function. * * Note: only IN and INOUT arguments are cached here. @@ -224,6 +263,8 @@ typedef struct ProxyFunction char **arg_names; /* Argument names, may contain NULLs */ short arg_count; /* Argument count of proxy function */ + bool *split_args; /* Map of arguments to split */ + /* if the function returns untyped RECORD that needs AS clause */ bool dynamic_record; @@ -272,6 +313,8 @@ void plproxy_error(ProxyFunction *func, const char *fmt,...); void plproxy_function_cache_init(void); void *plproxy_func_alloc(ProxyFunction *func, int size); char *plproxy_func_strdup(ProxyFunction *func, const char *s); +int plproxy_get_parameter_index(ProxyFunction *func, const char *ident); +bool plproxy_split_add_ident(ProxyFunction *func, const char *ident); ProxyFunction *plproxy_compile(FunctionCallInfo fcinfo, bool validate); /* execute.c */ @@ -312,8 +355,9 @@ bool plproxy_query_add_const(QueryBuffer *q, const char *data); bool plproxy_query_add_ident(QueryBuffer *q, const char *ident); ProxyQuery *plproxy_query_finish(QueryBuffer *q); ProxyQuery *plproxy_standard_query(ProxyFunction *func, bool add_types); -void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q); -void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q); +void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support); +void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, + DatumArray **array_params, int array_row); void plproxy_query_freeplan(ProxyQuery *q); #endif diff --git a/src/query.c b/src/query.c index c734bf0..d0e0577 100644 --- a/src/query.c +++ b/src/query.c @@ -90,25 +90,8 @@ plproxy_query_add_ident(QueryBuffer *q, const char *ident) fn_idx = -1, sql_idx = -1; - if (ident[0] == '$') - { - fn_idx = atoi(ident + 1) - 1; - if (fn_idx < 0 || fn_idx >= q->func->arg_count) - return false; - } - else if (q->func->arg_names) - { - for (i = 0; i < q->func->arg_count; i++) - { - if (!q->func->arg_names[i]) - continue; - if (pg_strcasecmp(ident, q->func->arg_names[i]) == 0) - { - fn_idx = i; - break; - } - } - } + fn_idx = plproxy_get_parameter_index(q->func, ident); + if (fn_idx >= 0) { for (i = 0; i < q->arg_count; i++) @@ -127,7 +110,12 @@ plproxy_query_add_ident(QueryBuffer *q, const char *ident) add_ref(q->sql, sql_idx, q->func, fn_idx, q->add_types); } else + { + if (ident[0] == '$') + return false; appendStringInfoString(q->sql, ident); + } + return true; } @@ -149,6 +137,7 @@ plproxy_query_finish(QueryBuffer *q) len = q->arg_count * sizeof(int); pq->arg_lookup = palloc(len); pq->plan = NULL; + memcpy(pq->arg_lookup, q->arg_lookup, len); MemoryContextSwitchTo(old); @@ -247,7 +236,7 @@ plproxy_standard_query(ProxyFunction *func, bool add_types) * Prepare ProxyQuery for local execution */ void -plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q) +plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, bool split_support) { int i; Oid types[FUNC_MAX_ARGS]; @@ -258,7 +247,11 @@ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery * { int idx = q->arg_lookup[i]; - types[i] = func->arg_types[idx]->type_oid; + if (split_support && IS_SPLIT_ARG(func, idx)) + /* for SPLIT arguments use array element type instead */ + types[i] = func->arg_types[idx]->elem_type; + else + types[i] = func->arg_types[idx]->type_oid; } /* prepare & store plan */ @@ -272,12 +265,12 @@ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery * * Result will be in SPI_tuptable. */ void -plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q) +plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q, + DatumArray **array_params, int array_row) { int i, idx, err; - ProxyType *type; char arg_nulls[FUNC_MAX_ARGS]; Datum arg_values[FUNC_MAX_ARGS]; @@ -285,12 +278,19 @@ plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q) for (i = 0; i < q->arg_count; i++) { idx = q->arg_lookup[i]; - type = func->arg_types[idx]; + if (PG_ARGISNULL(idx)) { arg_nulls[i] = 'n'; arg_values[i] = (Datum) NULL; } + else if (array_params && IS_SPLIT_ARG(func, idx)) + { + DatumArray *ats = array_params[idx]; + + arg_nulls[i] = ats->nulls[array_row] ? 'n' : ' '; + arg_values[i] = ats->nulls[array_row] ? (Datum) NULL : ats->values[array_row]; + } else { arg_nulls[i] = ' '; diff --git a/src/scanner.l b/src/scanner.l index cb203b7..36166c9 100644 --- a/src/scanner.l +++ b/src/scanner.l @@ -179,6 +179,7 @@ run { return RUN; } on { return ON; } all { return ALL; } any { return ANY; } +split { return SPLIT; } select { BEGIN(sql); yylval.str = yytext; return SELECT; } /* function call */ @@ -253,6 +253,10 @@ plproxy_find_type_info(ProxyFunction *func, Oid oid, bool for_send) type->for_send = for_send; type->by_value = s_type->typbyval; type->name = plproxy_func_strdup(func, namebuf); + type->is_array = (s_type->typelem != 0 && s_type->typlen == -1); + type->elem_type = s_type->typelem; + type->alignment = s_type->typalign; + type->length = s_type->typlen; /* decide what function is needed */ if (for_send) |
