summaryrefslogtreecommitdiff
path: root/src/veil_query.c
blob: ba14bac84c13a0a2c7bf507c3da102ba7dbb0c26 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
/**
 * @file   veil_query.c
 * \code
 *     Author:       Marc Munro
 *     Copyright (c) 2005 - 2011 Marc Munro
 *     License:      BSD
 * 
 * \endcode
 * @brief  
 * Functions to simplify SPI-based queries.  These are way more
 * sophisticated than veil really needs but are nice and generic.
 * 
 */


#include <stdio.h>
#include "postgres.h"
#include "catalog/pg_type.h"
#include "executor/spi.h"
#include "veil_version.h"
#include "access/xact.h"
#include "veil_funcs.h"

/**
 * A Fetch_fn is a function that processes records, one at a time,
 * returned from a query.
 */
typedef bool (Fetch_fn)(HeapTuple, TupleDesc, void *);



/**
 * The number of records to fetch in one go from the query executor.
 */
#define FETCH_SIZE    20


/**
 * If already connected in this session, push the current connection,
 * and get a new one.
 * We are already connected, if:
 * - are within a query
 * - and the current transaction id matches the saved transaction id
 */
int
vl_spi_connect(bool *p_pushed)
{
	int result = SPI_connect();
	if (result == SPI_ERROR_CONNECT) {
		SPI_push();
		*p_pushed = TRUE;
		return SPI_connect();
	}
	*p_pushed = FALSE;
	return result;
}

/**
 * Reciprocal function for vl_spi_connect()
 */
int
vl_spi_finish(bool pushed)
{
	int spi_result = SPI_finish();
	if (pushed) {
		SPI_pop();
	}
	return spi_result;
}

/** 
 * Prepare a query for query().  This creates and executes a plan.  The
 * caller must have established SPI_connect.  It is assumed that no
 * parameters to the query will be null.
 * \param qry The text of the SQL query to be performed.
 * \param nargs The number of input parameters ($1, $2, etc) to the query
 * \param argtypes Pointer to an array containing the OIDs of the data
 * \param args Actual parameters
 * types of the parameters 
 * \param read_only Whether the query should be read-only or not
 * \param saved_plan Adress of void pointer into which the query plan
 * will be saved.  Passing the same void pointer on a subsequent call
 * will cause the saved query plan to be re-used.
 */
static void
prepare_query(const char *qry,
			  int nargs,
			  Oid *argtypes,
			  Datum *args,
			  bool read_only,
			  void **saved_plan)
{
    void   *plan;
    int     exec_result;
	
    if (saved_plan && *saved_plan) {
		/* A previously prepared plan is available, so use it */
		plan = *saved_plan;
    }
    else {
		if (!(plan = SPI_prepare(qry, nargs, argtypes))) {
			ereport(ERROR,
					(errcode(ERRCODE_INTERNAL_ERROR),
					 errmsg("prepare_query fails"),
					 errdetail("SPI_prepare('%s') returns NULL "
							   "(SPI_result = %d)", 
							   qry, SPI_result)));
		}

		if (saved_plan) {
			/* We have somewhere to put the saved plan, so save  it. */
			*saved_plan = SPI_saveplan(plan);
		}
    }
	
	exec_result = SPI_execute_plan(plan, args, NULL, read_only, 0);
	if (exec_result < 0) {
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("prepare_query fails"),
				 errdetail("SPI_execute_plan('%s') returns error %d",
						   qry, exec_result)));
    }
}

/** 
 * Prepare and execute a query.  Query execution consists of a call to
 * process_row for each returned record.  Process_row can return a
 * single value to the caller of this function through the fn_param
 * parameter.  It is the caller's responsibility to establish an SPI
 * connection with SPI_connect.  It is assumed that no parameters to
 * the query, and no results will be null.
 * \param qry The text of the SQL query to be performed.
 * \param nargs The number of input parameters ($1, $2, etc) to the query
 * \param argtypes Pointer to an array containing the OIDs of the data
 * \param args Actual parameters
 * types of the parameters 
 * \param read_only Whether the query should be read-only or not
 * \param saved_plan Adress of void pointer into which the query plan
 * will be saved.  Passing the same void pointer on a subsequent call
 * will cause the saved query plan to be re-used.
 * \param process_row  The ::Fetch_fn function to be called for each
 * fetched row to process it.  If this is null, we simply count the row,
 * doing no processing on the tuples returned.
 * \param fn_param  An optional parameter to the process_row function.
 * This may be used to return a value to the caller.
 * \return The total number of records fetched and processed by
 * process_row.
 */
static int
query(const char *qry,
      int nargs,
      Oid *argtypes,
      Datum *args,
	  bool  read_only,
      void **saved_plan,
      Fetch_fn process_row,
      void *fn_param)
{
    int  row;
	int  fetched;
	int  processed = 0;
	bool cntinue;
	SPITupleTable *tuptab;

    prepare_query(qry, nargs, argtypes, args, read_only, saved_plan);
	fetched = SPI_processed;
	tuptab = SPI_tuptable;
	for(row = 0; row < fetched; row++) {
		processed++;
		/* Process a row using the processor function */
		cntinue = process_row(tuptab->vals[row], 
							  tuptab->tupdesc,
							  fn_param);
		if (!cntinue) {
			break;
		}
	}
    return processed;
}


/** 
 * ::Fetch_fn function for processing a single row of a single integer for 
 * ::query.
 * \param tuple The row to be processed
 * \param tupdesc Descriptor for the types of the fields in the tuple.
 * \param p_result Pointer to an int4 variable into which the value
 * returned from the query will be placed.
 * \return false.  This causes ::query to terminate after processing a
 * single row.
 */
static bool
fetch_one_bool(HeapTuple tuple, TupleDesc tupdesc, void *p_result)
{
	static bool ignore_this = false;
    bool col = DatumGetBool(SPI_getbinval(tuple, tupdesc, 1, &ignore_this));
    *((bool *) p_result) = col;
	
    return false;
}

/** 
 * ::Fetch_fn function for processing a single row of a single integer for 
 * ::query.
 * \param tuple The row to be processed
 * \param tupdesc Descriptor for the types of the fields in the tuple.
 * \param p_result Pointer to an int4 variable into which the value
 * returned from the query will be placed.
 * \return false.  This causes ::query to terminate after processing a
 * single row.
 */
static bool
fetch_one_str(HeapTuple tuple, TupleDesc tupdesc, void *p_result)
{
    char *col = SPI_getvalue(tuple, tupdesc, 1);
	char **p_str = (char **) p_result;
	*p_str = col;
	
    return false;
}

/** 
 * Executes a query that returns a single bool value.
 * 
 * @param qry The text of the query to be performed.
 * @param result Variable into which the result of the query will be placed.
 * 
 * @return true if the query returned a record, false otherwise.
 */
bool
vl_bool_from_query(const char *qry,
				   bool *result)
{
	int     rows;
    Oid     argtypes[0];
    Datum   args[0];
	rows = query(qry, 0, argtypes, args, false, NULL, 
				 fetch_one_bool, (void *)result);
	return (rows > 0);
}

/** 
 * Executes a query by oid, that returns a single string value.
 * 
 * @param qry The text of the query to be performed.
 * @param param The oid of the row to be fetched.
 * @param result Variable into which the result of the query will be placed.
 * 
 * @return true if the query returned a record, false otherwise.
 */
static bool
str_from_oid_query(const char *qry,
				   const Oid param,
				   char *result)
{
	int     rows;
    Oid     argtypes[1] = {OIDOID};
    Datum   args[1];
	
	args[0] = ObjectIdGetDatum(param);
	rows = query(qry, 1, argtypes, args, false, NULL, 
				 fetch_one_str, (void *)result);
	return (rows > 0);
}

/** 
 * Determine whether the given oid represents an existing database or not.
 * 
 * @param db_id Oid of the database in which we are interested.
 * 
 * @result True if the database exists.
 */
extern bool
vl_db_exists(Oid db_id)
{
	char dbname[NAMEDATALEN + 1];

	return str_from_oid_query("select datname from pg_database where oid = $1",
							  db_id, dbname);
}


/** 
 * ::Fetch_fn function for executing registered veil_init() functions for
 * ::query.
 * \param tuple The row to be processed
 * \param tupdesc Descriptor for the types of the fields in the tuple.
 * \param p_param Pointer to a boolean value which is the value of the
 * argument to the init function being called.
 * \return true.  This allows ::query to process further rows.
 */
static bool
exec_init_fn(HeapTuple tuple, TupleDesc tupdesc, void *p_param)
{
    char *col = SPI_getvalue(tuple, tupdesc, 1);
	char *qry = palloc(strlen(col) + 15);
	bool pushed;
	bool result;
	int ok;

	(void) sprintf(qry, "select %s(%s)", col, 
				   *((bool *) p_param)? "true": "false");

	ok = vl_spi_connect(&pushed);
	if (ok != SPI_OK_CONNECT) {
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("failed to execute exec_init_fn() (1)"),
				 errdetail("SPI_connect() failed, returning %d.", ok)));
	}

	(void) vl_bool_from_query(qry, &result);


	ok = vl_spi_finish(pushed);
	if (ok != SPI_OK_FINISH) {
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("failed to execute exec_init_fn() (2)"),
				 errdetail("SPI_finish() failed, returning %d.", ok)));
	}

    return true;
}


/** 
 * Identify any registered init_functions and execute them.
 * 
 * @param param The boolean parameter to be passed to each init_function.
 * 
 * @result The number of init_functions executed.
 */
int
vl_call_init_fns(bool param)
{
    Oid     argtypes[0];
    Datum   args[0];
	char   *qry = "select fn_name from veil.veil_init_fns order by priority";
	bool    pushed;
	int     rows;
	int     ok;

	ok = vl_spi_connect(&pushed);
	if (ok != SPI_OK_CONNECT) {
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("failed to execute vl_call_init_fns() (1)"),
				 errdetail("SPI_connect() failed, returning %d.", ok)));
	}

	rows = query(qry, 0, argtypes, args, false, NULL, 
				 exec_init_fn, (void *) &param);

	ok = vl_spi_finish(pushed);
	if (ok != SPI_OK_FINISH) {
		ereport(ERROR,
				(errcode(ERRCODE_INTERNAL_ERROR),
				 errmsg("failed to execute vl_call_init_fns() (2)"),
				 errdetail("SPI_finish() failed, returning %d.", ok)));
	}
    return rows;
}