Allow the built-in ordered-set aggregates to share transition state.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 16 Oct 2017 19:51:23 +0000 (15:51 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 16 Oct 2017 19:51:23 +0000 (15:51 -0400)
The built-in OSAs all share the same transition function, so they can
share transition state as long as the final functions cooperate to not
do the sort step more than once.  To avoid running the tuplesort object
in randomAccess mode unnecessarily, add a bit of infrastructure to
nodeAgg.c to let the aggregate functions find out whether the transition
state is actually being shared or not.

This doesn't work for the hypothetical aggregates, since those inject
a hypothetical row that isn't traceable to the shared input state.
So they remain marked aggfinalmodify = 'w'.

Discussion: https://postgr.es/m/CAB4ELO5RZhOamuT9Xsf72ozbenDLLXZKSk07FiSVsuJNZB861A@mail.gmail.com

src/backend/executor/nodeAgg.c
src/backend/utils/adt/orderedsetaggs.c
src/include/catalog/catversion.h
src/include/catalog/pg_aggregate.h
src/include/fmgr.h
src/test/regress/expected/aggregates.out
src/test/regress/sql/aggregates.sql

index 8a6dfd64e8bc3a65a61828560e3f1eead4d1781e..82ed5b3e1cbfeaeb941c0c764f728c21c6858429 100644 (file)
@@ -254,6 +254,11 @@ typedef struct AggStatePerTransData
     */
    Aggref     *aggref;
 
+   /*
+    * Is this state value actually being shared by more than one Aggref?
+    */
+   bool        aggshared;
+
    /*
     * Nominal number of arguments for aggregate function.  For plain aggs,
     * this excludes any ORDER BY expressions.  For ordered-set aggs, this
@@ -3360,9 +3365,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
        {
            /*
             * Existing compatible trans found, so just point the 'peragg' to
-            * the same per-trans struct.
+            * the same per-trans struct, and mark the trans state as shared.
             */
            pertrans = &pertransstates[existing_transno];
+           pertrans->aggshared = true;
            peragg->transno = existing_transno;
        }
        else
@@ -3512,6 +3518,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
 
    /* Begin filling in the pertrans data */
    pertrans->aggref = aggref;
+   pertrans->aggshared = false;
    pertrans->aggCollation = aggref->inputcollid;
    pertrans->transfn_oid = aggtransfn;
    pertrans->serialfn_oid = aggserialfn;
@@ -4161,17 +4168,18 @@ AggGetAggref(FunctionCallInfo fcinfo)
 {
    if (fcinfo->context && IsA(fcinfo->context, AggState))
    {
+       AggState   *aggstate = (AggState *) fcinfo->context;
        AggStatePerAgg curperagg;
        AggStatePerTrans curpertrans;
 
        /* check curperagg (valid when in a final function) */
-       curperagg = ((AggState *) fcinfo->context)->curperagg;
+       curperagg = aggstate->curperagg;
 
        if (curperagg)
            return curperagg->aggref;
 
        /* check curpertrans (valid when in a transition function) */
-       curpertrans = ((AggState *) fcinfo->context)->curpertrans;
+       curpertrans = aggstate->curpertrans;
 
        if (curpertrans)
            return curpertrans->aggref;
@@ -4201,6 +4209,44 @@ AggGetTempMemoryContext(FunctionCallInfo fcinfo)
    return NULL;
 }
 
+/*
+ * AggStateIsShared - find out whether transition state is shared
+ *
+ * If the function is being called as an aggregate support function,
+ * return TRUE if the aggregate's transition state is shared across
+ * multiple aggregates, FALSE if it is not.
+ *
+ * Returns TRUE if not called as an aggregate support function.
+ * This is intended as a conservative answer, ie "no you'd better not
+ * scribble on your input".  In particular, will return TRUE if the
+ * aggregate is being used as a window function, which is a scenario
+ * in which changing the transition state is a bad idea.  We might
+ * want to refine the behavior for the window case in future.
+ */
+bool
+AggStateIsShared(FunctionCallInfo fcinfo)
+{
+   if (fcinfo->context && IsA(fcinfo->context, AggState))
+   {
+       AggState   *aggstate = (AggState *) fcinfo->context;
+       AggStatePerAgg curperagg;
+       AggStatePerTrans curpertrans;
+
+       /* check curperagg (valid when in a final function) */
+       curperagg = aggstate->curperagg;
+
+       if (curperagg)
+           return aggstate->pertrans[curperagg->transno].aggshared;
+
+       /* check curpertrans (valid when in a transition function) */
+       curpertrans = aggstate->curpertrans;
+
+       if (curpertrans)
+           return curpertrans->aggshared;
+   }
+   return true;
+}
+
 /*
  * AggRegisterCallback - register a cleanup callback for an aggregate
  *
index 25905a3287e602668366113a9dc418334060e74d..1e323d94445af4eeb5d27f6d562a4f8324993f32 100644 (file)
  * create just once per query because they will not change across groups.
  * The per-query struct and subsidiary data live in the executor's per-query
  * memory context, and go away implicitly at ExecutorEnd().
+ *
+ * These structs are set up during the first call of the transition function.
+ * Because we allow nodeAgg.c to merge ordered-set aggregates (but not
+ * hypothetical aggregates) with identical inputs and transition functions,
+ * this info must not depend on the particular aggregate (ie, particular
+ * final-function), nor on the direct argument(s) of the aggregate.
  */
 
 typedef struct OSAPerQueryState
 {
-   /* Aggref for this aggregate: */
+   /* Representative Aggref for this aggregate: */
    Aggref     *aggref;
    /* Memory context containing this struct and other per-query data: */
    MemoryContext qcontext;
+   /* Do we expect multiple final-function calls within one group? */
+   bool        rescan_needed;
 
    /* These fields are used only when accumulating tuples: */
 
@@ -91,6 +99,8 @@ typedef struct OSAPerGroupState
    Tuplesortstate *sortstate;
    /* Number of normal rows inserted into sortstate: */
    int64       number_of_rows;
+   /* Have we already done tuplesort_performsort? */
+   bool        sort_done;
 } OSAPerGroupState;
 
 static void ordered_set_shutdown(Datum arg);
@@ -146,6 +156,9 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
        qstate->aggref = aggref;
        qstate->qcontext = qcontext;
 
+       /* We need to support rescans if the trans state is shared */
+       qstate->rescan_needed = AggStateIsShared(fcinfo);
+
        /* Extract the sort information */
        sortlist = aggref->aggorder;
        numSortCols = list_length(sortlist);
@@ -277,15 +290,18 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
                                                   qstate->sortOperators,
                                                   qstate->sortCollations,
                                                   qstate->sortNullsFirsts,
-                                                  work_mem, false);
+                                                  work_mem,
+                                                  qstate->rescan_needed);
    else
        osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
                                                    qstate->sortOperator,
                                                    qstate->sortCollation,
                                                    qstate->sortNullsFirst,
-                                                   work_mem, false);
+                                                   work_mem,
+                                                   qstate->rescan_needed);
 
    osastate->number_of_rows = 0;
+   osastate->sort_done = false;
 
    /* Now register a shutdown callback to clean things up at end of group */
    AggRegisterCallback(fcinfo,
@@ -306,14 +322,12 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
  * group) by ExecutorEnd.  But we must take care to release any potential
  * non-memory resources.
  *
- * This callback is arguably unnecessary, since we don't support use of
- * ordered-set aggs in AGG_HASHED mode and there is currently no non-error
- * code path in non-hashed modes wherein nodeAgg.c won't call the finalfn
- * after calling the transfn one or more times.  So in principle we could rely
- * on the finalfn to delete the tuplestore etc.  However, it's possible that
- * such a code path might exist in future, and in any case it'd be
- * notationally tedious and sometimes require extra data copying to ensure
- * we always delete the tuplestore in the finalfn.
+ * In the case where we're not expecting multiple finalfn calls, we could
+ * arguably rely on the finalfn to clean up; but it's easier and more testable
+ * if we just do it the same way in either case.  Note that many of the
+ * finalfns could *not* free the tuplesort object, at least not without extra
+ * data copying, because what they return is a pointer to a datum inside the
+ * tuplesort object.
  */
 static void
 ordered_set_shutdown(Datum arg)
@@ -436,8 +450,14 @@ percentile_disc_final(PG_FUNCTION_ARGS)
    if (osastate->number_of_rows == 0)
        PG_RETURN_NULL();
 
-   /* Finish the sort */
-   tuplesort_performsort(osastate->sortstate);
+   /* Finish the sort, or rescan if we already did */
+   if (!osastate->sort_done)
+   {
+       tuplesort_performsort(osastate->sortstate);
+       osastate->sort_done = true;
+   }
+   else
+       tuplesort_rescan(osastate->sortstate);
 
    /*----------
     * We need the smallest K such that (K/N) >= percentile.
@@ -457,13 +477,6 @@ percentile_disc_final(PG_FUNCTION_ARGS)
    if (!tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, NULL))
        elog(ERROR, "missing row in percentile_disc");
 
-   /*
-    * Note: we *cannot* clean up the tuplesort object here, because the value
-    * to be returned is allocated inside its sortcontext.  We could use
-    * datumCopy to copy it out of there, but it doesn't seem worth the
-    * trouble, since the cleanup callback will clear the tuplesort later.
-    */
-
    /* We shouldn't have stored any nulls, but do the right thing anyway */
    if (isnull)
        PG_RETURN_NULL();
@@ -543,8 +556,14 @@ percentile_cont_final_common(FunctionCallInfo fcinfo,
 
    Assert(expect_type == osastate->qstate->sortColType);
 
-   /* Finish the sort */
-   tuplesort_performsort(osastate->sortstate);
+   /* Finish the sort, or rescan if we already did */
+   if (!osastate->sort_done)
+   {
+       tuplesort_performsort(osastate->sortstate);
+       osastate->sort_done = true;
+   }
+   else
+       tuplesort_rescan(osastate->sortstate);
 
    first_row = floor(percentile * (osastate->number_of_rows - 1));
    second_row = ceil(percentile * (osastate->number_of_rows - 1));
@@ -575,13 +594,6 @@ percentile_cont_final_common(FunctionCallInfo fcinfo,
        val = lerpfunc(first_val, second_val, proportion);
    }
 
-   /*
-    * Note: we *cannot* clean up the tuplesort object here, because the value
-    * to be returned may be allocated inside its sortcontext.  We could use
-    * datumCopy to copy it out of there, but it doesn't seem worth the
-    * trouble, since the cleanup callback will clear the tuplesort later.
-    */
-
    PG_RETURN_DATUM(val);
 }
 
@@ -779,8 +791,14 @@ percentile_disc_multi_final(PG_FUNCTION_ARGS)
     */
    if (i < num_percentiles)
    {
-       /* Finish the sort */
-       tuplesort_performsort(osastate->sortstate);
+       /* Finish the sort, or rescan if we already did */
+       if (!osastate->sort_done)
+       {
+           tuplesort_performsort(osastate->sortstate);
+           osastate->sort_done = true;
+       }
+       else
+           tuplesort_rescan(osastate->sortstate);
 
        for (; i < num_percentiles; i++)
        {
@@ -804,11 +822,6 @@ percentile_disc_multi_final(PG_FUNCTION_ARGS)
        }
    }
 
-   /*
-    * We could clean up the tuplesort object after forming the array, but
-    * probably not worth the trouble.
-    */
-
    /* We make the output array the same shape as the input */
    PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull,
                                         ARR_NDIM(param),
@@ -902,8 +915,14 @@ percentile_cont_multi_final_common(FunctionCallInfo fcinfo,
     */
    if (i < num_percentiles)
    {
-       /* Finish the sort */
-       tuplesort_performsort(osastate->sortstate);
+       /* Finish the sort, or rescan if we already did */
+       if (!osastate->sort_done)
+       {
+           tuplesort_performsort(osastate->sortstate);
+           osastate->sort_done = true;
+       }
+       else
+           tuplesort_rescan(osastate->sortstate);
 
        for (; i < num_percentiles; i++)
        {
@@ -962,11 +981,6 @@ percentile_cont_multi_final_common(FunctionCallInfo fcinfo,
        }
    }
 
-   /*
-    * We could clean up the tuplesort object after forming the array, but
-    * probably not worth the trouble.
-    */
-
    /* We make the output array the same shape as the input */
    PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull,
                                         ARR_NDIM(param),
@@ -1043,8 +1057,14 @@ mode_final(PG_FUNCTION_ARGS)
 
    shouldfree = !(osastate->qstate->typByVal);
 
-   /* Finish the sort */
-   tuplesort_performsort(osastate->sortstate);
+   /* Finish the sort, or rescan if we already did */
+   if (!osastate->sort_done)
+   {
+       tuplesort_performsort(osastate->sortstate);
+       osastate->sort_done = true;
+   }
+   else
+       tuplesort_rescan(osastate->sortstate);
 
    /* Scan tuples and count frequencies */
    while (tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, &abbrev_val))
@@ -1097,13 +1117,6 @@ mode_final(PG_FUNCTION_ARGS)
    if (shouldfree && !last_val_is_mode)
        pfree(DatumGetPointer(last_val));
 
-   /*
-    * Note: we *cannot* clean up the tuplesort object here, because the value
-    * to be returned is allocated inside its sortcontext.  We could use
-    * datumCopy to copy it out of there, but it doesn't seem worth the
-    * trouble, since the cleanup callback will clear the tuplesort later.
-    */
-
    if (mode_freq)
        PG_RETURN_DATUM(mode_val);
    else
@@ -1174,6 +1187,9 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag,
 
    hypothetical_check_argtypes(fcinfo, nargs, osastate->qstate->tupdesc);
 
+   /* because we need a hypothetical row, we can't share transition state */
+   Assert(!osastate->sort_done);
+
    /* insert the hypothetical row into the sort */
    slot = osastate->qstate->tupslot;
    ExecClearTuple(slot);
@@ -1190,6 +1206,7 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag,
 
    /* finish the sort */
    tuplesort_performsort(osastate->sortstate);
+   osastate->sort_done = true;
 
    /* iterate till we find the hypothetical row */
    while (tuplesort_gettupleslot(osastate->sortstate, true, true, slot, NULL))
@@ -1207,10 +1224,6 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag,
 
    ExecClearTuple(slot);
 
-   /* Might as well clean up the tuplesort object immediately */
-   tuplesort_end(osastate->sortstate);
-   osastate->sortstate = NULL;
-
    return rank;
 }
 
@@ -1329,6 +1342,9 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS)
    /* Get short-term context we can use for execTuplesMatch */
    tmpcontext = AggGetTempMemoryContext(fcinfo);
 
+   /* because we need a hypothetical row, we can't share transition state */
+   Assert(!osastate->sort_done);
+
    /* insert the hypothetical row into the sort */
    slot = osastate->qstate->tupslot;
    ExecClearTuple(slot);
@@ -1345,6 +1361,7 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS)
 
    /* finish the sort */
    tuplesort_performsort(osastate->sortstate);
+   osastate->sort_done = true;
 
    /*
     * We alternate fetching into tupslot and extraslot so that we have the
@@ -1391,10 +1408,6 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS)
 
    ExecDropSingleTupleTableSlot(extraslot);
 
-   /* Might as well clean up the tuplesort object immediately */
-   tuplesort_end(osastate->sortstate);
-   osastate->sortstate = NULL;
-
    rank = rank - duplicate_count;
 
    PG_RETURN_INT64(rank);
index 7c1756ae086d057c84cf576c357d583c350fc169..9a7f5b25a3a17d3647178b9ce277985a93fda611 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 201710141
+#define CATALOG_VERSION_NO 201710161
 
 #endif
index 5769f6430a03d81f4d6325a6016bc4c9a54f7312..13f1bce5afa37d44cd48cbf75ec3fa833070d22b 100644 (file)
@@ -318,13 +318,13 @@ DATA(insert ( 3267    n 0 jsonb_agg_transfn   jsonb_agg_finalfn               -   -   -   -               -               -
 DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn   -   -   -   -               -               -           f f r r 0   2281    0   0       0   _null_ _null_ ));
 
 /* ordered-set and hypothetical-set aggregates */
-DATA(insert ( 3972 o 1 ordered_set_transition          percentile_disc_final                   -   -   -   -       -       -       t f w w 0   2281    0   0       0   _null_ _null_ ));
-DATA(insert ( 3974 o 1 ordered_set_transition          percentile_cont_float8_final            -   -   -   -       -       -       f f w w 0   2281    0   0       0   _null_ _null_ ));
-DATA(insert ( 3976 o 1 ordered_set_transition          percentile_cont_interval_final          -   -   -   -       -       -       f f w w 0   2281    0   0       0   _null_ _null_ ));
-DATA(insert ( 3978 o 1 ordered_set_transition          percentile_disc_multi_final             -   -   -   -       -       -       t f w w 0   2281    0   0       0   _null_ _null_ ));
-DATA(insert ( 3980 o 1 ordered_set_transition          percentile_cont_float8_multi_final      -   -   -   -       -       -       f f w w 0   2281    0   0       0   _null_ _null_ ));
-DATA(insert ( 3982 o 1 ordered_set_transition          percentile_cont_interval_multi_final    -   -   -   -       -       -       f f w w 0   2281    0   0       0   _null_ _null_ ));
-DATA(insert ( 3984 o 0 ordered_set_transition          mode_final                              -   -   -   -       -       -       t f w w 0   2281    0   0       0   _null_ _null_ ));
+DATA(insert ( 3972 o 1 ordered_set_transition          percentile_disc_final                   -   -   -   -       -       -       t f s s 0   2281    0   0       0   _null_ _null_ ));
+DATA(insert ( 3974 o 1 ordered_set_transition          percentile_cont_float8_final            -   -   -   -       -       -       f f s s 0   2281    0   0       0   _null_ _null_ ));
+DATA(insert ( 3976 o 1 ordered_set_transition          percentile_cont_interval_final          -   -   -   -       -       -       f f s s 0   2281    0   0       0   _null_ _null_ ));
+DATA(insert ( 3978 o 1 ordered_set_transition          percentile_disc_multi_final             -   -   -   -       -       -       t f s s 0   2281    0   0       0   _null_ _null_ ));
+DATA(insert ( 3980 o 1 ordered_set_transition          percentile_cont_float8_multi_final      -   -   -   -       -       -       f f s s 0   2281    0   0       0   _null_ _null_ ));
+DATA(insert ( 3982 o 1 ordered_set_transition          percentile_cont_interval_multi_final    -   -   -   -       -       -       f f s s 0   2281    0   0       0   _null_ _null_ ));
+DATA(insert ( 3984 o 0 ordered_set_transition          mode_final                              -   -   -   -       -       -       t f s s 0   2281    0   0       0   _null_ _null_ ));
 DATA(insert ( 3986 h 1 ordered_set_transition_multi    rank_final                              -   -   -   -       -       -       t f w w 0   2281    0   0       0   _null_ _null_ ));
 DATA(insert ( 3988 h 1 ordered_set_transition_multi    percent_rank_final                      -   -   -   -       -       -       t f w w 0   2281    0   0       0   _null_ _null_ ));
 DATA(insert ( 3990 h 1 ordered_set_transition_multi    cume_dist_final                         -   -   -   -       -       -       t f w w 0   2281    0   0       0   _null_ _null_ ));
index b604a5c16245f0f4d3cccd9de899c270df3b854e..a68ec91c683b630f8b9b65f9df24f971435d110e 100644 (file)
@@ -698,6 +698,7 @@ extern int AggCheckCallContext(FunctionCallInfo fcinfo,
                    MemoryContext *aggcontext);
 extern fmAggrefPtr AggGetAggref(FunctionCallInfo fcinfo);
 extern MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo);
+extern bool AggStateIsShared(FunctionCallInfo fcinfo);
 extern void AggRegisterCallback(FunctionCallInfo fcinfo,
                    fmExprContextCallbackFunction func,
                    Datum arg);
index c4ea86ff05008c092c57b1d2daa9436c2623e0ff..3408cf3333e2e6f4e003673c195ce002412a13e4 100644 (file)
@@ -1866,7 +1866,7 @@ NOTICE:  avg_transfn called with 3
       2 |      6
 (1 row)
 
--- ideally these would share state, but we have to fix the OSAs first.
+-- exercise cases where OSAs share state
 select
   percentile_cont(0.5) within group (order by a),
   percentile_disc(0.5) within group (order by a)
@@ -1876,6 +1876,16 @@ from (values(1::float8),(3),(5),(7)) t(a);
                4 |               3
 (1 row)
 
+select
+  percentile_cont(0.25) within group (order by a),
+  percentile_disc(0.5) within group (order by a)
+from (values(1::float8),(3),(5),(7)) t(a);
+ percentile_cont | percentile_disc 
+-----------------+-----------------
+             2.5 |               3
+(1 row)
+
+-- these can't share state currently
 select
   rank(4) within group (order by a),
   dense_rank(4) within group (order by a)
index fefbef89e08907ded2cefd2798a129b975ec302d..55c8528fd57575a3a8f3c098df282ba2977fb245 100644 (file)
@@ -741,12 +741,18 @@ select my_avg(one) filter (where one > 1),my_sum(one) from (values(1),(3)) t(one
 -- this should not share the state due to different input columns.
 select my_avg(one),my_sum(two) from (values(1,2),(3,4)) t(one,two);
 
--- ideally these would share state, but we have to fix the OSAs first.
+-- exercise cases where OSAs share state
 select
   percentile_cont(0.5) within group (order by a),
   percentile_disc(0.5) within group (order by a)
 from (values(1::float8),(3),(5),(7)) t(a);
 
+select
+  percentile_cont(0.25) within group (order by a),
+  percentile_disc(0.5) within group (order by a)
+from (values(1::float8),(3),(5),(7)) t(a);
+
+-- these can't share state currently
 select
   rank(4) within group (order by a),
   dense_rank(4) within group (order by a)