Refactor crosstab() to build and return a tuplestore instead of using
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 1 Dec 2008 01:30:18 +0000 (01:30 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 1 Dec 2008 01:30:18 +0000 (01:30 +0000)
value-per-call mode.  This should be more efficient in normal usage,
but the real problem with the prior coding was that it returned with
a SPI call still active.  That could cause problems if execution was
interleaved with anything else that might use SPI.

contrib/tablefunc/tablefunc.c

index 28b7a64f439de3438bdedf80abfaabd5543527a9..c6e452fc403b83900398266d5b7d48674b55bf04 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * $PostgreSQL: pgsql/contrib/tablefunc/tablefunc.c,v 1.56 2008/11/30 23:23:52 tgl Exp $ 
+ * $PostgreSQL: pgsql/contrib/tablefunc/tablefunc.c,v 1.57 2008/12/01 01:30:18 tgl Exp $ 
  *
  *
  * tablefunc
@@ -94,12 +94,6 @@ typedef struct
    bool        use_carry;      /* use second generated value */
 }  normal_rand_fctx;
 
-typedef struct
-{
-   SPITupleTable *spi_tuptable;    /* sql results from user query */
-   char       *lastrowid;      /* rowid of the last tuple sent */
-}  crosstab_fctx;
-
 #define xpfree(var_) \
    do { \
        if (var_ != NULL) \
@@ -356,304 +350,254 @@ PG_FUNCTION_INFO_V1(crosstab);
 Datum
 crosstab(PG_FUNCTION_ARGS)
 {
-   FuncCallContext *funcctx;
-   TupleDesc   ret_tupdesc;
+   char       *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+   ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+   Tuplestorestate *tupstore;
+   TupleDesc   tupdesc;
    int         call_cntr;
    int         max_calls;
    AttInMetadata *attinmeta;
-   SPITupleTable *spi_tuptable = NULL;
+   SPITupleTable *spi_tuptable;
    TupleDesc   spi_tupdesc;
-   char       *lastrowid = NULL;
-   crosstab_fctx *fctx;
+   bool        firstpass;
+   char       *lastrowid;
    int         i;
    int         num_categories;
-   bool        firstpass = false;
+   MemoryContext per_query_ctx;
    MemoryContext oldcontext;
+   int         ret;
+   int         proc;
 
-   /* stuff done only on the first call of the function */
-   if (SRF_IS_FIRSTCALL())
-   {
-       char       *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
-       TupleDesc   tupdesc;
-       int         ret;
-       int         proc;
-
-       /* create a function context for cross-call persistence */
-       funcctx = SRF_FIRSTCALL_INIT();
+   /* check to see if caller supports us returning a tuplestore */
+   if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("set-valued function called in context that cannot accept a set")));
+   if (!(rsinfo->allowedModes & SFRM_Materialize))
+       ereport(ERROR,
+               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                errmsg("materialize mode required, but it is not " \
+                       "allowed in this context")));
 
-       /* Connect to SPI manager */
-       if ((ret = SPI_connect()) < 0)
-           /* internal error */
-           elog(ERROR, "crosstab: SPI_connect returned %d", ret);
+   per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
 
-       /* Retrieve the desired rows */
-       ret = SPI_execute(sql, true, 0);
-       proc = SPI_processed;
+   /* Connect to SPI manager */
+   if ((ret = SPI_connect()) < 0)
+       /* internal error */
+       elog(ERROR, "crosstab: SPI_connect returned %d", ret);
 
-       /* Check for qualifying tuples */
-       if ((ret == SPI_OK_SELECT) && (proc > 0))
-       {
-           spi_tuptable = SPI_tuptable;
-           spi_tupdesc = spi_tuptable->tupdesc;
-
-           /*----------
-            * The provided SQL query must always return three columns.
-            *
-            * 1. rowname
-            *  the label or identifier for each row in the final result
-            * 2. category
-            *  the label or identifier for each column in the final result
-            * 3. values
-            *  the value for each column in the final result
-            *----------
-            */
-           if (spi_tupdesc->natts != 3)
-               ereport(ERROR,
-                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                        errmsg("invalid source data SQL statement"),
-                        errdetail("The provided SQL must return 3 "
-                                  "columns: rowid, category, and values.")));
-       }
-       else
-       {
-           /* no qualifying tuples */
-           SPI_finish();
-           SRF_RETURN_DONE(funcctx);
-       }
+   /* Retrieve the desired rows */
+   ret = SPI_execute(sql, true, 0);
+   proc = SPI_processed;
 
-       /* get a tuple descriptor for our result type */
-       switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-       {
-           case TYPEFUNC_COMPOSITE:
-               /* success */
-               break;
-           case TYPEFUNC_RECORD:
-               /* failed to determine actual type of RECORD */
-               ereport(ERROR,
-                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-               break;
-           default:
-               /* result type isn't composite */
-               elog(ERROR, "return type must be a row type");
-               break;
-       }
+   /* If no qualifying tuples, fall out early */
+   if (ret != SPI_OK_SELECT || proc <= 0)
+   {
+       SPI_finish();
+       rsinfo->isDone = ExprEndResult;
+       PG_RETURN_NULL();
+   }
 
-       /*
-        * Check that return tupdesc is compatible with the data we got from
-        * SPI, at least based on number and type of attributes
-        */
-       if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
-           ereport(ERROR,
-                   (errcode(ERRCODE_SYNTAX_ERROR),
-                    errmsg("return and sql tuple descriptions are " \
-                           "incompatible")));
+   spi_tuptable = SPI_tuptable;
+   spi_tupdesc = spi_tuptable->tupdesc;
 
-       /*
-        * switch to memory context appropriate for multiple function calls
-        */
-       oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+   /*----------
+    * The provided SQL query must always return three columns.
+    *
+    * 1. rowname
+    *  the label or identifier for each row in the final result
+    * 2. category
+    *  the label or identifier for each column in the final result
+    * 3. values
+    *  the value for each column in the final result
+    *----------
+    */
+   if (spi_tupdesc->natts != 3)
+       ereport(ERROR,
+               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                errmsg("invalid source data SQL statement"),
+                errdetail("The provided SQL must return 3 "
+                          "columns: rowid, category, and values.")));
 
-       /* make sure we have a persistent copy of the tupdesc */
-       tupdesc = CreateTupleDescCopy(tupdesc);
+   /* get a tuple descriptor for our result type */
+   switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+   {
+       case TYPEFUNC_COMPOSITE:
+           /* success */
+           break;
+       case TYPEFUNC_RECORD:
+           /* failed to determine actual type of RECORD */
+           ereport(ERROR,
+                   (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                    errmsg("function returning record called in context "
+                           "that cannot accept type record")));
+           break;
+       default:
+           /* result type isn't composite */
+           elog(ERROR, "return type must be a row type");
+           break;
+   }
 
-       /*
-        * Generate attribute metadata needed later to produce tuples from raw
-        * C strings
-        */
-       attinmeta = TupleDescGetAttInMetadata(tupdesc);
-       funcctx->attinmeta = attinmeta;
+   /*
+    * Check that return tupdesc is compatible with the data we got from
+    * SPI, at least based on number and type of attributes
+    */
+   if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
+       ereport(ERROR,
+               (errcode(ERRCODE_SYNTAX_ERROR),
+                errmsg("return and sql tuple descriptions are " \
+                       "incompatible")));
 
-       /* allocate memory for user context */
-       fctx = (crosstab_fctx *) palloc(sizeof(crosstab_fctx));
+   /*
+    * switch to long-lived memory context
+    */
+   oldcontext = MemoryContextSwitchTo(per_query_ctx);
 
-       /*
-        * Save spi data for use across calls
-        */
-       fctx->spi_tuptable = spi_tuptable;
-       fctx->lastrowid = NULL;
-       funcctx->user_fctx = fctx;
+   /* make sure we have a persistent copy of the result tupdesc */
+   tupdesc = CreateTupleDescCopy(tupdesc);
 
-       /* total number of tuples to be returned */
-       funcctx->max_calls = proc;
+   /* initialize our tuplestore in long-lived context */
+   tupstore =
+       tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
+                             false, work_mem);
 
-       MemoryContextSwitchTo(oldcontext);
-       firstpass = true;
-   }
-
-   /* stuff done on every call of the function */
-   funcctx = SRF_PERCALL_SETUP();
+   MemoryContextSwitchTo(oldcontext);
 
    /*
-    * initialize per-call variables
+    * Generate attribute metadata needed later to produce tuples from raw
+    * C strings
     */
-   call_cntr = funcctx->call_cntr;
-   max_calls = funcctx->max_calls;
-
-   /* user context info */
-   fctx = (crosstab_fctx *) funcctx->user_fctx;
-   lastrowid = fctx->lastrowid;
-   spi_tuptable = fctx->spi_tuptable;
-
-   /* the sql tuple */
-   spi_tupdesc = spi_tuptable->tupdesc;
+   attinmeta = TupleDescGetAttInMetadata(tupdesc);
 
-   /* attribute return type and return tuple description */
-   attinmeta = funcctx->attinmeta;
-   ret_tupdesc = attinmeta->tupdesc;
+   /* total number of tuples to be examined */
+   max_calls = proc;
 
    /* the return tuple always must have 1 rowid + num_categories columns */
-   num_categories = ret_tupdesc->natts - 1;
+   num_categories = tupdesc->natts - 1;
 
-   if (call_cntr < max_calls)  /* do when there is more left to send */
+   firstpass = true;
+   lastrowid = NULL;
+
+   for (call_cntr = 0; call_cntr < max_calls; call_cntr++)
    {
-       HeapTuple   tuple;
-       Datum       result;
-       char      **values;
        bool        skip_tuple = false;
+       char      **values;
+
+       /* allocate and zero space */
+       values = (char **) palloc0((1 + num_categories) * sizeof(char *));
 
-       while (true)
+       /*
+        * now loop through the sql results and assign each value in
+        * sequence to the next category
+        */
+       for (i = 0; i < num_categories; i++)
        {
-           /* allocate space */
-           values = (char **) palloc((1 + num_categories) * sizeof(char *));
+           HeapTuple   spi_tuple;
+           char       *rowid;
+
+           /* see if we've gone too far already */
+           if (call_cntr >= max_calls)
+               break;
 
-           /* and make sure it's clear */
-           memset(values, '\0', (1 + num_categories) * sizeof(char *));
+           /* get the next sql result tuple */
+           spi_tuple = spi_tuptable->vals[call_cntr];
+
+           /* get the rowid from the current sql result tuple */
+           rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
 
            /*
-            * now loop through the sql results and assign each value in
-            * sequence to the next category
+            * If this is the first pass through the values for this
+            * rowid, set the first column to rowid
             */
-           for (i = 0; i < num_categories; i++)
+           if (i == 0)
            {
-               HeapTuple   spi_tuple;
-               char       *rowid = NULL;
-
-               /* see if we've gone too far already */
-               if (call_cntr >= max_calls)
-                   break;
-
-               /* get the next sql result tuple */
-               spi_tuple = spi_tuptable->vals[call_cntr];
-
-               /* get the rowid from the current sql result tuple */
-               rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
-
-               /*
-                * If this is the first pass through the values for this
-                * rowid, set the first column to rowid
-                */
-               if (i == 0)
-               {
-                   xpstrdup(values[0], rowid);
-
-                   /*
-                    * Check to see if the rowid is the same as that of the
-                    * last tuple sent -- if so, skip this tuple entirely
-                    */
-                   if (!firstpass && xstreq(lastrowid, rowid))
-                   {
-                       skip_tuple = true;
-                       break;
-                   }
-               }
+               xpstrdup(values[0], rowid);
 
                /*
-                * If rowid hasn't changed on us, continue building the ouput
-                * tuple.
+                * Check to see if the rowid is the same as that of the
+                * last tuple sent -- if so, skip this tuple entirely
                 */
-               if (xstreq(rowid, values[0]))
-               {
-                   /*
-                    * Get the next category item value, which is always
-                    * attribute number three.
-                    *
-                    * Be careful to assign the value to the array index based
-                    * on which category we are presently processing.
-                    */
-                   values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
-
-                   /*
-                    * increment the counter since we consume a row for each
-                    * category, but not for last pass because the API will do
-                    * that for us
-                    */
-                   if (i < (num_categories - 1))
-                       call_cntr = ++funcctx->call_cntr;
-               }
-               else
+               if (!firstpass && xstreq(lastrowid, rowid))
                {
-                   /*
-                    * We'll fill in NULLs for the missing values, but we need
-                    * to decrement the counter since this sql result row
-                    * doesn't belong to the current output tuple.
-                    */
-                   call_cntr = --funcctx->call_cntr;
+                   xpfree(rowid);
+                   skip_tuple = true;
                    break;
                }
-               xpfree(rowid);
            }
 
            /*
-            * switch to memory context appropriate for multiple function
-            * calls
+            * If rowid hasn't changed on us, continue building the output
+            * tuple.
             */
-           oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
-           xpfree(fctx->lastrowid);
-           xpstrdup(fctx->lastrowid, values[0]);
-           lastrowid = fctx->lastrowid;
-
-           MemoryContextSwitchTo(oldcontext);
-
-           if (!skip_tuple)
+           if (xstreq(rowid, values[0]))
            {
-               /* build the tuple */
-               tuple = BuildTupleFromCStrings(attinmeta, values);
-
-               /* make the tuple into a datum */
-               result = HeapTupleGetDatum(tuple);
-
-               /* Clean up */
-               for (i = 0; i < num_categories + 1; i++)
-                   if (values[i] != NULL)
-                       xpfree(values[i]);
-               xpfree(values);
+               /*
+                * Get the next category item value, which is always
+                * attribute number three.
+                *
+                * Be careful to assign the value to the array index based
+                * on which category we are presently processing.
+                */
+               values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
 
-               SRF_RETURN_NEXT(funcctx, result);
+               /*
+                * increment the counter since we consume a row for each
+                * category, but not for last pass because the outer loop
+                * will do that for us
+                */
+               if (i < (num_categories - 1))
+                   call_cntr++;
+               xpfree(rowid);
            }
            else
            {
                /*
-                * Skipping this tuple entirely, but we need to advance the
-                * counter like the API would if we had returned one.
+                * We'll fill in NULLs for the missing values, but we need
+                * to decrement the counter since this sql result row
+                * doesn't belong to the current output tuple.
                 */
-               call_cntr = ++funcctx->call_cntr;
+               call_cntr--;
+               xpfree(rowid);
+               break;
+           }
+       }
 
-               /* we'll start over at the top */
-               xpfree(values);
+       if (!skip_tuple)
+       {
+           HeapTuple   tuple;
 
-               /* see if we've gone too far already */
-               if (call_cntr >= max_calls)
-               {
-                   /* release SPI related resources */
-                   SPI_finish();
-                   SRF_RETURN_DONE(funcctx);
-               }
+           /* build the tuple */
+           tuple = BuildTupleFromCStrings(attinmeta, values);
 
-               /* need to reset this before the next tuple is started */
-               skip_tuple = false;
-           }
+           /* switch to appropriate context while storing the tuple */
+           oldcontext = MemoryContextSwitchTo(per_query_ctx);
+           tuplestore_puttuple(tupstore, tuple);
+           MemoryContextSwitchTo(oldcontext);
+
+           heap_freetuple(tuple);
        }
+
+       /* Remember current rowid */
+       xpfree(lastrowid);
+       xpstrdup(lastrowid, values[0]);
+       firstpass = false;
+
+       /* Clean up */
+       for (i = 0; i < num_categories + 1; i++)
+           if (values[i] != NULL)
+               pfree(values[i]);
+       pfree(values);
    }
-   else
-       /* do when there is no more left */
-   {
-       /* release SPI related resources */
-       SPI_finish();
-       SRF_RETURN_DONE(funcctx);
-   }
+
+   /* let the caller know we're sending back a tuplestore */
+   rsinfo->returnMode = SFRM_Materialize;
+   rsinfo->setResult = tupstore;
+   rsinfo->setDesc = tupdesc;
+
+   /* release SPI related resources (and return to caller's context) */
+   SPI_finish();
+
+   return (Datum) 0;
 }
 
 /*
@@ -1613,6 +1557,10 @@ compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
    Form_pg_attribute sql_attr;
    Oid         sql_atttypid;
 
+   if (ret_tupdesc->natts < 2 ||
+       sql_tupdesc->natts < 3)
+       return false;
+
    /* check the rowid types match */
    ret_atttypid = ret_tupdesc->attrs[0]->atttypid;
    sql_atttypid = sql_tupdesc->attrs[0]->atttypid;