summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPavan Deolasee2015-05-05 09:29:03 +0000
committerPavan Deolasee2015-05-05 09:29:03 +0000
commit175d1e8593ac02bd65ad0c9343d9609012ce0cda (patch)
tree2185d57da402368994291bafd144d454ecd78311 /src
parent73fa25c67cbfa24c03e28c96bf356f2592671730 (diff)
Fix int8 and numeric aggregate support post 9.4 merge.
PG 9.4 uses an internal data type to store transition function results for numeric and int8 aggregates for performance reasons. Since we need to send these transient results to coordinator for collection and finalisation, we need support for in/out functions for this internal datatype. We now do so by defining and using a new type called numeric_agg_state and implement those functions. Also fix some assorted issues in this area arising from merge
Diffstat (limited to 'src')
-rw-r--r--src/backend/executor/nodeWindowAgg.c2
-rw-r--r--src/backend/parser/parse_agg.c2
-rw-r--r--src/backend/utils/adt/numeric.c332
-rw-r--r--src/include/catalog/pg_aggregate.h56
-rw-r--r--src/include/catalog/pg_proc.h12
-rw-r--r--src/include/catalog/pg_type.h3
-rw-r--r--src/include/parser/parse_agg.h2
-rw-r--r--src/include/utils/builtins.h7
8 files changed, 310 insertions, 106 deletions
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index a3ebbee195..57de1c74fd 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -2258,10 +2258,10 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
invtransfn_oid,
finalfn_oid,
&transfnexpr,
+ &invtransfnexpr,
#ifdef XCP
&collectfnexpr,
#endif
- &invtransfnexpr,
&finalfnexpr);
/* set up infrastructure for calling the transfn(s) and finalfn */
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 9e81051893..090cba2774 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -1256,10 +1256,10 @@ build_aggregate_fnexprs(Oid *agg_input_types,
Oid invtransfn_oid,
Oid finalfn_oid,
Expr **transfnexpr,
+ Expr **invtransfnexpr,
#ifdef XCP
Expr **collectfnexpr,
#endif
- Expr **invtransfnexpr,
Expr **finalfnexpr)
{
Param *argp;
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index e53b67dedb..0ea6a33b21 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -2548,6 +2548,223 @@ makeNumericAggState(FunctionCallInfo fcinfo, bool calcSumX2)
}
/*
+ * numeric_agg_state_in() -
+ *
+ * Input function for numeric_agg_state data type
+ */
+Datum
+numeric_agg_state_in(PG_FUNCTION_ARGS)
+{
+ char *str = pstrdup(PG_GETARG_CSTRING(0));
+ NumericAggState *state;
+ char *token;
+
+ state = (NumericAggState *) palloc0(sizeof (NumericAggState));
+ init_var(&state->sumX);
+
+ token = strtok(str, ":");
+ state->calcSumX2 = (*token == 't');
+
+ token = strtok(NULL, ":");
+ state->N = DatumGetInt64(DirectFunctionCall1(int8in,CStringGetDatum(token)));
+
+ token = strtok(NULL, ":");
+ set_var_from_str(token, token, &state->sumX);
+
+ token = strtok(NULL, ":");
+ if (state->calcSumX2)
+ {
+ init_var(&state->sumX2);
+ set_var_from_str(token, token, &state->sumX2);
+ }
+
+ token = strtok(NULL, ":");
+ state->maxScale = DatumGetInt32(DirectFunctionCall1(int4in,CStringGetDatum(token)));
+
+ token = strtok(NULL, ":");
+ state->maxScaleCount = DatumGetInt64(DirectFunctionCall1(int8in,CStringGetDatum(token)));
+
+ token = strtok(NULL, ":");
+ state->NaNcount = DatumGetInt64(DirectFunctionCall1(int8in,CStringGetDatum(token)));
+
+ PG_RETURN_POINTER(state);
+}
+
+/*
+ * numeric_agg_state_out() -
+ *
+ * Output function for numeric_agg_state data type
+ */
+Datum
+numeric_agg_state_out(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state = (NumericAggState *) PG_GETARG_POINTER(0);
+ char *sumX_str, *sumX2_str, *N_str,
+ *maxScale_str, *maxScaleCount_str,
+ *NaNcount_str;
+ char *result;
+ int len;
+
+ sumX_str = get_str_from_var(&state->sumX);
+ if (state->calcSumX2)
+ sumX2_str = get_str_from_var(&state->sumX2);
+ else
+ sumX2_str = "0";
+
+ N_str = DatumGetCString(DirectFunctionCall1(int8out,
+ Int64GetDatum(state->N)));
+ maxScaleCount_str = DatumGetCString(DirectFunctionCall1(int8out,
+ Int64GetDatum(state->maxScaleCount)));
+ NaNcount_str = DatumGetCString(DirectFunctionCall1(int8out,
+ Int64GetDatum(state->NaNcount)));
+ maxScale_str = DatumGetCString(DirectFunctionCall1(int4out,
+ Int32GetDatum(state->maxScale)));
+
+ len = 1 + strlen(N_str) + strlen(sumX_str) + strlen(sumX2_str) +
+ strlen(maxScale_str) + strlen(maxScaleCount_str) +
+ strlen(NaNcount_str) + 7;
+
+ result = (char *) palloc0(len);
+
+ snprintf(result, len, "%c:%s:%s:%s:%s:%s:%s",
+ state->calcSumX2 ? 't' : 'f',
+ N_str, sumX_str, sumX2_str,
+ maxScale_str, maxScaleCount_str, NaNcount_str);
+
+ pfree(N_str);
+ pfree(sumX_str);
+ if (state->calcSumX2)
+ pfree(sumX2_str);
+ pfree(maxScale_str);
+ pfree(maxScaleCount_str);
+ pfree(NaNcount_str);
+
+ PG_RETURN_CSTRING(result);
+}
+
+/*
+ * numeric_agg_state_recv - converts binary format to numeric_agg_state
+ */
+Datum
+numeric_agg_state_recv(PG_FUNCTION_ARGS)
+{
+ StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
+ NumericAggState *state;
+ int len;
+ int i;
+
+ state = (NumericAggState *) palloc0(sizeof (NumericAggState));
+
+ state->calcSumX2 = pq_getmsgbyte(buf);
+ state->N = pq_getmsgint(buf, sizeof (int64));
+
+ len = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid length in external \"numeric\" value")));
+
+ alloc_var(&state->sumX, len);
+
+ state->sumX.weight = (int16) pq_getmsgint(buf, sizeof(int16));
+ state->sumX.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ if (!(state->sumX.sign == NUMERIC_POS ||
+ state->sumX.sign == NUMERIC_NEG ||
+ state->sumX.sign == NUMERIC_NAN))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid sign in external \"numeric\" value")));
+
+ state->sumX.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ for (i = 0; i < len; i++)
+ {
+ NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
+
+ if (d < 0 || d >= NBASE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid digit in external \"numeric\" value")));
+ state->sumX.digits[i] = d;
+ }
+
+ if (state->calcSumX2)
+ {
+ len = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ if (len < 0 || len > NUMERIC_MAX_PRECISION + NUMERIC_MAX_RESULT_SCALE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid length in external \"numeric\" value")));
+
+ alloc_var(&state->sumX2, len);
+
+ state->sumX2.weight = (int16) pq_getmsgint(buf, sizeof(int16));
+ state->sumX2.sign = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ if (!(state->sumX2.sign == NUMERIC_POS ||
+ state->sumX2.sign == NUMERIC_NEG ||
+ state->sumX2.sign == NUMERIC_NAN))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid sign in external \"numeric\" value")));
+
+ state->sumX2.dscale = (uint16) pq_getmsgint(buf, sizeof(uint16));
+ for (i = 0; i < len; i++)
+ {
+ NumericDigit d = pq_getmsgint(buf, sizeof(NumericDigit));
+
+ if (d < 0 || d >= NBASE)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+ errmsg("invalid digit in external \"numeric\" value")));
+ state->sumX2.digits[i] = d;
+ }
+ }
+ state->maxScale = pq_getmsgint(buf, sizeof (int));
+ state->maxScaleCount = pq_getmsgint(buf, sizeof (int64));
+ state->NaNcount = pq_getmsgint(buf, sizeof (int64));
+
+ PG_RETURN_POINTER(state);
+}
+
+/*
+ * numeric_agg_state_send - converts numeric_agg_state to binary format
+ */
+Datum
+numeric_agg_state_send(PG_FUNCTION_ARGS)
+{
+ NumericAggState *state = (NumericAggState *) PG_GETARG_POINTER(0);
+ StringInfoData buf;
+ int i;
+
+ pq_begintypsend(&buf);
+
+ pq_sendbyte(&buf, state->calcSumX2);
+ pq_sendint(&buf, state->N, sizeof (int64));
+
+ pq_sendint(&buf, state->sumX.ndigits, sizeof(int16));
+ pq_sendint(&buf, state->sumX.weight, sizeof(int16));
+ pq_sendint(&buf, state->sumX.sign, sizeof(int16));
+ pq_sendint(&buf, state->sumX.dscale, sizeof(int16));
+ for (i = 0; i < state->sumX.ndigits; i++)
+ pq_sendint(&buf, state->sumX.digits[i], sizeof(NumericDigit));
+
+ if (state->calcSumX2)
+ {
+ pq_sendint(&buf, state->sumX2.ndigits, sizeof(int16));
+ pq_sendint(&buf, state->sumX2.weight, sizeof(int16));
+ pq_sendint(&buf, state->sumX2.sign, sizeof(int16));
+ pq_sendint(&buf, state->sumX2.dscale, sizeof(int16));
+ for (i = 0; i < state->sumX2.ndigits; i++)
+ pq_sendint(&buf, state->sumX2.digits[i], sizeof(NumericDigit));
+ }
+
+ pq_sendint(&buf, state->maxScale, sizeof (int));
+ pq_sendint(&buf, state->maxScaleCount, sizeof (int64));
+ pq_sendint(&buf, state->NaNcount, sizeof (int64));
+
+ PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
+}
+
+/*
* Accumulate a new input value for numeric aggregate functions.
*/
static void
@@ -6635,81 +6852,52 @@ strip_var(NumericVar *var)
Datum
numeric_collect(PG_FUNCTION_ARGS)
{
- ArrayType *collectarray = PG_GETARG_ARRAYTYPE_P(0);
- ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(1);
- Datum *collectdatums;
- Datum *transdatums;
- int ndatums;
- Datum N,
- sumX,
- sumX2;
-
- /* We assume the input is array of numeric */
- deconstruct_array(collectarray,
- NUMERICOID, -1, false, 'i',
- &collectdatums, NULL, &ndatums);
- if (ndatums != 3)
- elog(ERROR, "expected 3-element numeric array");
- N = collectdatums[0];
- sumX = collectdatums[1];
- sumX2 = collectdatums[2];
-
- /* We assume the input is array of numeric */
- deconstruct_array(transarray,
- NUMERICOID, -1, false, 'i',
- &transdatums, NULL, &ndatums);
- if (ndatums != 3)
- elog(ERROR, "expected 3-element numeric array");
-
- N = DirectFunctionCall2(numeric_add, N, transdatums[0]);
- sumX = DirectFunctionCall2(numeric_add, sumX, transdatums[1]);
- sumX2 = DirectFunctionCall2(numeric_add, sumX2, transdatums[2]);
-
- collectdatums[0] = N;
- collectdatums[1] = sumX;
- collectdatums[2] = sumX2;
-
- PG_RETURN_ARRAYTYPE_P(construct_array(collectdatums, 3,
- NUMERICOID, -1, false, 'i'));
-}
+ NumericAggState *collectstate;
+ NumericAggState *transstate;
+ MemoryContext agg_context;
+ MemoryContext old_context;
-Datum
-numeric_avg_collect(PG_FUNCTION_ARGS)
-{
- ArrayType *collectarray = PG_GETARG_ARRAYTYPE_P(0);
- ArrayType *transarray = PG_GETARG_ARRAYTYPE_P(1);
- Datum *collectdatums;
- Datum *transdatums;
- int ndatums;
- Datum N,
- sumX;
-
- /* We assume the input is array of numeric */
- deconstruct_array(collectarray,
- NUMERICOID, -1, false, 'i',
- &collectdatums, NULL, &ndatums);
- if (ndatums != 2)
- elog(ERROR, "expected 2-element numeric array");
- N = collectdatums[0];
- sumX = collectdatums[1];
-
- /* We assume the input is array of numeric */
- deconstruct_array(transarray,
- NUMERICOID, -1, false, 'i',
- &transdatums, NULL, &ndatums);
- if (ndatums != 2)
- elog(ERROR, "expected 2-element numeric array");
-
- N = DirectFunctionCall2(numeric_add, N, transdatums[0]);
- sumX = DirectFunctionCall2(numeric_add, sumX, transdatums[1]);
-
- collectdatums[0] = N;
- collectdatums[1] = sumX;
-
- PG_RETURN_ARRAYTYPE_P(construct_array(collectdatums, 2,
- NUMERICOID, -1, false, 'i'));
+ if (!AggCheckCallContext(fcinfo, &agg_context))
+ elog(ERROR, "aggregate function called in non-aggregate context");
+
+ old_context = MemoryContextSwitchTo(agg_context);
+
+ collectstate = PG_ARGISNULL(0) ? NULL : (NumericAggState *) PG_GETARG_POINTER(0);
+
+ if (collectstate == NULL)
+ {
+ collectstate = (NumericAggState *) palloc0(sizeof (NumericAggState));
+ init_var(&collectstate->sumX);
+ init_var(&collectstate->sumX2);
+ }
+
+ transstate = PG_ARGISNULL(1) ? NULL : (NumericAggState *) PG_GETARG_POINTER(1);
+
+ if (transstate == NULL)
+ PG_RETURN_POINTER(collectstate);
+
+ Assert(collectstate->calcSumX2 == transstate->calcSumX2);
+
+ collectstate->N += transstate->N;
+ add_var(&collectstate->sumX, &transstate->sumX, &collectstate->sumX);
+ if (collectstate->calcSumX2)
+ add_var(&collectstate->sumX2, &transstate->sumX2, &collectstate->sumX2);
+ collectstate->NaNcount += transstate->NaNcount;
+
+ if (collectstate->maxScale < transstate->maxScale)
+ {
+ collectstate->maxScale = transstate->maxScale;
+ collectstate->maxScaleCount = transstate->maxScaleCount;
+ }
+ else if (collectstate->maxScale == transstate->maxScale)
+ collectstate->maxScaleCount += transstate->maxScaleCount;
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_POINTER(collectstate);
}
+
Datum
int8_avg_collect(PG_FUNCTION_ARGS)
{
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index a960bd9b29..30dca9c440 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -150,23 +150,23 @@ typedef FormData_pg_aggregate *Form_pg_aggregate;
*/
/* avg */
-DATA(insert ( 2100 n 0 int8_avg_accum numeric_avg_collect numeric_avg int8_avg_accum int8_accum_inv numeric_avg f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2100 n 0 int8_avg_accum numeric_collect numeric_avg int8_avg_accum int8_accum_inv numeric_avg f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2101 n 0 int4_avg_accum int8_avg_collect int8_avg int4_avg_accum int4_avg_accum_inv int8_avg f f 0 1016 1016 0 1016 0 "{0,0}" "{0,0}" "{0,0}" ));
DATA(insert ( 2102 n 0 int2_avg_accum int8_avg_collect int8_avg int2_avg_accum int2_avg_accum_inv int8_avg f f 0 1016 1016 0 1016 0 "{0,0}" "{0,0}" "{0,0}" ));
-DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg_collect numeric_avg numeric_avg_accum numeric_accum_inv numeric_avg f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2103 n 0 numeric_avg_accum numeric_collect numeric_avg numeric_avg_accum numeric_accum_inv numeric_avg f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2104 n 0 float4_accum float8_collect float8_avg - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
DATA(insert ( 2105 n 0 float8_accum float8_collect float8_avg - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
DATA(insert ( 2106 n 0 interval_accum interval_collect interval_avg interval_accum interval_accum_inv interval_avg f f 0 1187 1187 0 1187 0 "{0 second,0 second}" "{0 second,0 second}" "{0 second,0 second}" ));
/* sum */
-DATA(insert ( 2107 n 0 int8_avg_accum numeric_add numeric_sum int8_avg_accum int8_accum_inv numeric_sum f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2107 n 0 int8_avg_accum numeric_collect numeric_sum int8_avg_accum int8_accum_inv numeric_sum f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2108 n 0 int4_sum int8_sum_to_int8 - int4_avg_accum int4_avg_accum_inv int2int4_sum f f 0 20 20 0 1016 0 _null_ _null_ "{0,0}" ));
DATA(insert ( 2109 n 0 int2_sum int8_sum_to_int8 - int2_avg_accum int2_avg_accum_inv int2int4_sum f f 0 20 20 0 1016 0 _null_ _null_ "{0,0}" ));
DATA(insert ( 2110 n 0 float4pl float4pl - - - - f f 0 700 700 0 0 0 _null_ _null_ _null_ ));
DATA(insert ( 2111 n 0 float8pl float8pl - - - - f f 0 701 701 0 0 0 _null_ _null_ _null_ ));
DATA(insert ( 2112 n 0 cash_pl cash_pl - cash_pl cash_mi - f f 0 790 0 790 790 0 _null_ _null_ _null_ ));
DATA(insert ( 2113 n 0 interval_pl interval_pl - interval_pl interval_mi - f f 0 1186 1186 0 1186 0 _null_ _null_ _null_ ));
-DATA(insert ( 2114 n 0 numeric_avg_accum numeric_add numeric_sum numeric_avg_accum numeric_accum_inv numeric_sum f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2114 n 0 numeric_avg_accum numeric_collect numeric_sum numeric_avg_accum numeric_accum_inv numeric_sum f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
/* max */
DATA(insert ( 2115 n 0 int8larger int8larger - - - - f f 413 20 20 0 0 0 _null_ _null_ _null_ ));
@@ -217,52 +217,52 @@ DATA(insert ( 2147 n 0 int8inc_any int8_sum_to_int8 - int8inc_any int8dec_a
DATA(insert ( 2803 n 0 int8inc int8_sum_to_int8 - int8inc int8dec - f f 0 20 20 0 20 0 "0" _null_ "0" ));
/* var_pop */
-DATA(insert ( 2718 n 0 int8_accum numeric_collect numeric_var_pop int8_accum int8_accum_inv numeric_var_pop f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2719 n 0 int4_accum numeric_collect numeric_var_pop int4_accum int4_accum_inv numeric_var_pop f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2720 n 0 int2_accum numeric_collect numeric_var_pop int2_accum int2_accum_inv numeric_var_pop f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2718 n 0 int8_accum numeric_collect numeric_var_pop int8_accum int8_accum_inv numeric_var_pop f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2719 n 0 int4_accum numeric_collect numeric_var_pop int4_accum int4_accum_inv numeric_var_pop f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2720 n 0 int2_accum numeric_collect numeric_var_pop int2_accum int2_accum_inv numeric_var_pop f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2721 n 0 float4_accum float8_collect float8_var_pop - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
DATA(insert ( 2722 n 0 float8_accum float8_collect float8_var_pop - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
-DATA(insert ( 2723 n 0 numeric_accum numeric_collect numeric_var_pop numeric_accum numeric_accum_inv numeric_var_pop f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2723 n 0 numeric_accum numeric_collect numeric_var_pop numeric_accum numeric_accum_inv numeric_var_pop f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
/* var_samp */
-DATA(insert ( 2641 n 0 int8_accum numeric_collect numeric_var_samp int8_accum int8_accum_inv numeric_var_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2642 n 0 int4_accum numeric_collect numeric_var_samp int4_accum int4_accum_inv numeric_var_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2643 n 0 int2_accum numeric_collect numeric_var_samp int2_accum int2_accum_inv numeric_var_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2641 n 0 int8_accum numeric_collect numeric_var_samp int8_accum int8_accum_inv numeric_var_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2642 n 0 int4_accum numeric_collect numeric_var_samp int4_accum int4_accum_inv numeric_var_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2643 n 0 int2_accum numeric_collect numeric_var_samp int2_accum int2_accum_inv numeric_var_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2644 n 0 float4_accum float8_collect float8_var_samp - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
DATA(insert ( 2645 n 0 float8_accum float8_collect float8_var_samp - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
-DATA(insert ( 2646 n 0 numeric_accum numeric_collect numeric_var_samp numeric_accum numeric_accum_inv numeric_var_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2646 n 0 numeric_accum numeric_collect numeric_var_samp numeric_accum numeric_accum_inv numeric_var_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
/* variance: historical Postgres syntax for var_samp */
-DATA(insert ( 2148 n 0 int8_accum numeric_collect numeric_var_samp int8_accum int8_accum_inv numeric_var_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2149 n 0 int4_accum numeric_collect numeric_var_samp int4_accum int4_accum_inv numeric_var_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2150 n 0 int2_accum numeric_collect numeric_var_samp int2_accum int2_accum_inv numeric_var_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2148 n 0 int8_accum numeric_collect numeric_var_samp int8_accum int8_accum_inv numeric_var_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2149 n 0 int4_accum numeric_collect numeric_var_samp int4_accum int4_accum_inv numeric_var_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2150 n 0 int2_accum numeric_collect numeric_var_samp int2_accum int2_accum_inv numeric_var_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2151 n 0 float4_accum float8_collect float8_var_samp - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
DATA(insert ( 2152 n 0 float8_accum float8_collect float8_var_samp - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
-DATA(insert ( 2153 n 0 numeric_accum numeric_collect numeric_var_samp numeric_accum numeric_accum_inv numeric_var_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2153 n 0 numeric_accum numeric_collect numeric_var_samp numeric_accum numeric_accum_inv numeric_var_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
/* stddev_pop */
-DATA(insert ( 2724 n 0 int8_accum numeric_collect numeric_stddev_pop int8_accum int8_accum_inv numeric_stddev_pop f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2725 n 0 int4_accum numeric_collect numeric_stddev_pop int4_accum int4_accum_inv numeric_stddev_pop f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2726 n 0 int2_accum numeric_collect numeric_stddev_pop int2_accum int2_accum_inv numeric_stddev_pop f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2724 n 0 int8_accum numeric_collect numeric_stddev_pop int8_accum int8_accum_inv numeric_stddev_pop f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2725 n 0 int4_accum numeric_collect numeric_stddev_pop int4_accum int4_accum_inv numeric_stddev_pop f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2726 n 0 int2_accum numeric_collect numeric_stddev_pop int2_accum int2_accum_inv numeric_stddev_pop f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2727 n 0 float4_accum float8_collect float8_stddev_pop - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
DATA(insert ( 2728 n 0 float8_accum float8_collect float8_stddev_pop - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
-DATA(insert ( 2729 n 0 numeric_accum numeric_collect numeric_stddev_pop numeric_accum numeric_accum_inv numeric_stddev_pop f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2729 n 0 numeric_accum numeric_collect numeric_stddev_pop numeric_accum numeric_accum_inv numeric_stddev_pop f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
/* stddev_samp */
-DATA(insert ( 2712 n 0 int8_accum numeric_collect numeric_stddev_samp int8_accum int8_accum_inv numeric_stddev_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2713 n 0 int4_accum numeric_collect numeric_stddev_samp int4_accum int4_accum_inv numeric_stddev_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2714 n 0 int2_accum numeric_collect numeric_stddev_samp int2_accum int2_accum_inv numeric_stddev_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2712 n 0 int8_accum numeric_collect numeric_stddev_samp int8_accum int8_accum_inv numeric_stddev_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2713 n 0 int4_accum numeric_collect numeric_stddev_samp int4_accum int4_accum_inv numeric_stddev_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2714 n 0 int2_accum numeric_collect numeric_stddev_samp int2_accum int2_accum_inv numeric_stddev_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2715 n 0 float4_accum float8_collect float8_stddev_samp - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
DATA(insert ( 2716 n 0 float8_accum float8_collect float8_stddev_samp - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
-DATA(insert ( 2717 n 0 numeric_accum numeric_collect numeric_stddev_samp numeric_accum numeric_accum_inv numeric_stddev_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2717 n 0 numeric_accum numeric_collect numeric_stddev_samp numeric_accum numeric_accum_inv numeric_stddev_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
/* stddev: historical Postgres syntax for stddev_samp */
-DATA(insert ( 2154 n 0 int8_accum numeric_collect numeric_stddev_samp int8_accum int8_accum_inv numeric_stddev_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2155 n 0 int4_accum numeric_collect numeric_stddev_samp int4_accum int4_accum_inv numeric_stddev_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
-DATA(insert ( 2156 n 0 int2_accum numeric_collect numeric_stddev_samp int2_accum int2_accum_inv numeric_stddev_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2154 n 0 int8_accum numeric_collect numeric_stddev_samp int8_accum int8_accum_inv numeric_stddev_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2155 n 0 int4_accum numeric_collect numeric_stddev_samp int4_accum int4_accum_inv numeric_stddev_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
+DATA(insert ( 2156 n 0 int2_accum numeric_collect numeric_stddev_samp int2_accum int2_accum_inv numeric_stddev_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
DATA(insert ( 2157 n 0 float4_accum float8_collect float8_stddev_samp - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
DATA(insert ( 2158 n 0 float8_accum float8_collect float8_stddev_samp - - - f f 0 1022 1022 0 0 0 "{0,0,0}" "{0,0,0}" _null_ ));
-DATA(insert ( 2159 n 0 numeric_accum numeric_collect numeric_stddev_samp numeric_accum numeric_accum_inv numeric_stddev_samp f f 0 2281 2281 128 2281 128 _null_ _null_ _null_ ));
+DATA(insert ( 2159 n 0 numeric_accum numeric_collect numeric_stddev_samp numeric_accum numeric_accum_inv numeric_stddev_samp f f 0 6013 6013 128 6013 128 _null_ _null_ _null_ ));
/* SQL2003 binary regression aggregates */
DATA(insert ( 2818 n 0 int8inc_float8_float8 int8_sum_to_int8 - - - - f f 0 20 20 0 0 0 "0" _null_ _null_ ));
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 2eed6e9d7e..8f681abe74 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -2518,9 +2518,7 @@ DESCR("aggregate transition function");
#ifdef PGXC
DATA(insert OID = 6000 ( float8_collect PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 1022 "1022 1022" _null_ _null_ _null_ _null_ float8_collect _null_ _null_ _null_ ));
DESCR("aggregate collection function");
-DATA(insert OID = 6001 ( numeric_avg_collect PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 1231 "1231 1231" _null_ _null_ _null_ _null_ numeric_avg_collect _null_ _null_ _null_ ));
-DESCR("aggregate collection function");
-DATA(insert OID = 6002 ( numeric_collect PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 1231 "1231 1231" _null_ _null_ _null_ _null_ numeric_collect _null_ _null_ _null_ ));
+DATA(insert OID = 6002 ( numeric_collect PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 6013 "6013 6013" _null_ _null_ _null_ _null_ numeric_collect _null_ _null_ _null_ ));
DESCR("aggregate collection function");
DATA(insert OID = 6003 ( interval_collect PGNSP PGUID 12 1 0 0 0 f f f f t f i 2 0 1187 "1187 1187" _null_ _null_ _null_ _null_ interval_collect _null_ _null_ _null_ ));
DESCR("aggregate transition function");
@@ -5071,6 +5069,14 @@ DESCR("lock the cluster for taking backup");
DATA(insert OID = 6012 ( stormdb_promote_standby PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ stormdb_promote_standby _null_ _null_ _null_ ));
DESCR("touch trigger file on a standby machine to end replication");
#endif
+DATA(insert OID = 6014 ( numeric_agg_state_in PGNSP PGUID 12 1 0 0 0 f f f f t f i 3 0 1700 "6013" _null_ _null_ _null_ _null_ numeric_agg_state_in _null_ _null_ _null_ ));
+DESCR("I/O");
+DATA(insert OID = 6015 ( numeric_agg_state_out PGNSP PGUID 12 1 0 0 0 f f f f t f i 1 0 6013 "1700" _null_ _null_ _null_ _null_ numeric_agg_state_out _null_ _null_ _null_ ));
+DESCR("I/O");
+DATA(insert OID = 6016 ( numeric_agg_state_recv PGNSP PGUID 12 1 0 0 0 f f f f t f i 3 0 6013 "2281" _null_ _null_ _null_ _null_ numeric_agg_state_recv _null_ _null_ _null_ ));
+DESCR("I/O");
+DATA(insert OID = 6017 ( numeric_agg_state_send PGNSP PGUID 12 1 0 0 0 f f f f t f i 1 0 17 "6013" _null_ _null_ _null_ _null_ numeric_agg_state_send _null_ _null_ _null_ ));
+DESCR("I/O");
#endif
/*
diff --git a/src/include/catalog/pg_type.h b/src/include/catalog/pg_type.h
index 756c161e0c..349537116d 100644
--- a/src/include/catalog/pg_type.h
+++ b/src/include/catalog/pg_type.h
@@ -688,6 +688,9 @@ DATA(insert OID = 3115 ( fdw_handler PGNSP PGUID 4 t p P f t \054 0 0 0 fdw_han
DATA(insert OID = 3831 ( anyrange PGNSP PGUID -1 f p P f t \054 0 0 0 anyrange_in anyrange_out - - - - - d x f 0 -1 0 0 _null_ _null_ _null_ ));
#define ANYRANGEOID 3831
+DATA(insert OID = 6013 ( numeric_agg_state PGNSP PGUID SIZEOF_POINTER t c C f t \054 0 0 0 numeric_agg_state_in numeric_agg_state_out numeric_agg_state_recv numeric_agg_state_send - - - ALIGNOF_POINTER p f 0 -1 0 0 _null_ _null_ _null_ ));
+DESCR("numeric_agg_state - internal type used for numeric/int8 aggregation");
+#define NUMERIC_AGG_STATE_OID 6013
/*
* macros
diff --git a/src/include/parser/parse_agg.h b/src/include/parser/parse_agg.h
index ef7c1fb3f9..fbac35bb76 100644
--- a/src/include/parser/parse_agg.h
+++ b/src/include/parser/parse_agg.h
@@ -53,10 +53,10 @@ extern void build_aggregate_fnexprs(Oid *agg_input_types,
Oid invtransfn_oid,
Oid finalfn_oid,
Expr **transfnexpr,
+ Expr **invtransfnexpr,
#ifdef XCP
Expr **collectfnexpr,
#endif
- Expr **invtransfnexpr,
Expr **finalfnexpr);
#endif /* PARSE_AGG_H */
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index b36b5af08f..17eea7af86 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1081,6 +1081,13 @@ extern Datum int2int4_sum(PG_FUNCTION_ARGS);
extern Datum width_bucket_numeric(PG_FUNCTION_ARGS);
extern Datum hash_numeric(PG_FUNCTION_ARGS);
+#ifdef PGXC
+extern Datum numeric_agg_state_in(PG_FUNCTION_ARGS);
+extern Datum numeric_agg_state_out(PG_FUNCTION_ARGS);
+extern Datum numeric_agg_state_recv(PG_FUNCTION_ARGS);
+extern Datum numeric_agg_state_send(PG_FUNCTION_ARGS);
+#endif
+
/* ri_triggers.c */
extern Datum RI_FKey_check_ins(PG_FUNCTION_ARGS);
extern Datum RI_FKey_check_upd(PG_FUNCTION_ARGS);