Reimplement NUMERIC datatype using base-10000 arithmetic; also improve
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 21 Mar 2003 01:58:05 +0000 (01:58 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 21 Mar 2003 01:58:05 +0000 (01:58 +0000)
some of the algorithms for higher functions.  I see about a factor of ten
speedup on the 'numeric' regression test, but it's unlikely that that test
is representative of real-world applications.
initdb forced due to change of on-disk representation for NUMERIC.

doc/src/sgml/release.sgml
src/backend/utils/adt/numeric.c
src/include/catalog/catversion.h
src/include/utils/numeric.h
src/test/regress/expected/aggregates.out
src/test/regress/expected/numeric.out

index b4eabbcb7776e49241bf6eefa46e08fdeed047ff..654b8b53c089a37f676dd9d746f353b8349be6eb 100644 (file)
@@ -1,5 +1,5 @@
 <!--
-$Header: /cvsroot/pgsql/doc/src/sgml/release.sgml,v 1.185 2003/02/05 17:41:32 tgl Exp $
+$Header: /cvsroot/pgsql/doc/src/sgml/release.sgml,v 1.186 2003/03/21 01:58:04 tgl Exp $
 -->
 
 <appendix id="release">
@@ -24,6 +24,7 @@ CDATA means the content is "SGML-free", so you can write without
 worries about funny characters.
 -->
 <literallayout><![CDATA[
+Reimplementation of NUMERIC datatype for more speed
 New regular expression package, many more regexp features (most of Perl5)
 Can now do EXPLAIN ... EXECUTE to see plan used for a prepared query
 Explicit JOINs no longer constrain query plan, unless JOIN_COLLAPSE_LIMIT = 1
index 2214f570d6d77a5b6e27bfe7766f36422f8bf32e..e6698fea6c03c322c9e17b008cc68d7728babbc0 100644 (file)
@@ -1,19 +1,29 @@
-/* ----------
+/*-------------------------------------------------------------------------
+ *
  * numeric.c
+ *   An exact numeric data type for the Postgres database system
  *
- * An exact numeric data type for the Postgres database system
+ * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
  *
- * 1998 Jan Wieck
+ * Many of the algorithmic ideas are borrowed from David M. Smith's "FM"
+ * multiple-precision math library, most recently published as Algorithm
+ * 786: Multiple-Precision Complex Arithmetic and Functions, ACM
+ * Transactions on Mathematical Software, Vol. 24, No. 4, December 1998,
+ * pages 359-367.
  *
- * $Header: /cvsroot/pgsql/src/backend/utils/adt/numeric.c,v 1.58 2003/03/14 00:15:32 tgl Exp $
+ * Copyright (c) 1998-2003, PostgreSQL Global Development Group
  *
- * ----------
+ * IDENTIFICATION
+ *   $Header: /cvsroot/pgsql/src/backend/utils/adt/numeric.c,v 1.59 2003/03/21 01:58:04 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
 #include <ctype.h>
 #include <float.h>
+#include <limits.h>
 #include <math.h>
 #include <errno.h>
 
 /* ----------
  * Local data types
  *
- * Note: the first digit of a NumericVar's value is assumed to be multiplied
- * by 10 ** weight.  Another way to say it is that there are weight+1 digits
- * before the decimal point.  It is possible to have weight < 0.
- *
+ * Numeric values are represented in a base-NBASE floating point format.
+ * Each "digit" ranges from 0 to NBASE-1.  The type NumericDigit is signed
+ * and wide enough to store a digit.  We assume that NBASE*NBASE can fit in
+ * an int.  Although the purely calculational routines could handle any even
+ * NBASE that's less than sqrt(INT_MAX), in practice we are only interested
+ * in NBASE a power of ten, so that I/O conversions and decimal rounding
+ * are easy.  Also, it's actually more efficient if NBASE is rather less than
+ * sqrt(INT_MAX), so that there is "headroom" for mul_var and div_var to
+ * postpone processing carries.
+ * ----------
+ */
+
+#if 0
+#define NBASE      10
+#define HALF_NBASE 5
+#define DEC_DIGITS 1           /* decimal digits per NBASE digit */
+#define MUL_GUARD_DIGITS   4   /* these are measured in NBASE digits */
+#define DIV_GUARD_DIGITS   8
+
+typedef signed char NumericDigit;
+#endif
+
+#if 0
+#define NBASE      100
+#define HALF_NBASE 50
+#define DEC_DIGITS 2           /* decimal digits per NBASE digit */
+#define MUL_GUARD_DIGITS   3   /* these are measured in NBASE digits */
+#define DIV_GUARD_DIGITS   6
+
+typedef signed char NumericDigit;
+#endif
+
+#if 1
+#define NBASE      10000
+#define HALF_NBASE 5000
+#define DEC_DIGITS 4           /* decimal digits per NBASE digit */
+#define MUL_GUARD_DIGITS   2   /* these are measured in NBASE digits */
+#define DIV_GUARD_DIGITS   4
+
+typedef int16 NumericDigit;
+#endif
+
+
+/* ----------
  * The value represented by a NumericVar is determined by the sign, weight,
- * ndigits, and digits[] array.  The rscale and dscale are carried along,
- * but they are just auxiliary information until rounding is done before
- * final storage or display.  (Scales are the number of digits wanted
- * *after* the decimal point.  Scales are always >= 0.)
+ * ndigits, and digits[] array.
+ * Note: the first digit of a NumericVar's value is assumed to be multiplied
+ * by NBASE ** weight.  Another way to say it is that there are weight+1
+ * digits before the decimal point.  It is possible to have weight < 0.
  *
  * buf points at the physical start of the palloc'd digit buffer for the
  * NumericVar. digits points at the first digit in actual use (the one
- * with the specified weight). We normally leave an unused byte or two
+ * with the specified weight). We normally leave an unused digit or two
  * (preset to zeroes) between buf and digits, so that there is room to store
  * a carry out of the top digit without special pushups.  We just need to
  * decrement digits (and increment weight) to make room for the carry digit.
+ * (There is no such extra space in a numeric value stored in the database,
+ * only in a NumericVar in memory.)
  *
  * If buf is NULL then the digit buffer isn't actually palloc'd and should
  * not be freed --- see the constants below for an example.
  *
+ * dscale, or display scale, is the nominal precision expressed as number
+ * of digits after the decimal point (it must always be >= 0 at present).
+ * dscale may be more than the number of physically stored fractional digits,
+ * implying that we have suppressed storage of significant trailing zeroes.
+ * It should never be less than the number of stored digits, since that would
+ * imply hiding digits that are present.  NOTE that dscale is always expressed
+ * in *decimal* digits, and so it may correspond to a fractional number of
+ * base-NBASE digits --- divide by DEC_DIGITS to convert to NBASE digits.
+ *
+ * rscale, or result scale, is the target precision for a computation.
+ * Like dscale it is expressed as number of *decimal* digits after the decimal
+ * point, and is always >= 0 at present.
+ * Note that rscale is not stored in variables --- it's figured on-the-fly
+ * from the dscales of the inputs.
+ *
  * NB: All the variable-level functions are written in a style that makes it
  * possible to give one and the same variable as argument and destination.
  * This is feasible because the digit buffer is separate from the variable.
  * ----------
  */
-typedef unsigned char NumericDigit;
-
 typedef struct NumericVar
 {
-   int         ndigits;        /* number of digits in digits[] - can be
-                                * 0! */
+   int         ndigits;        /* # of digits in digits[] - can be 0! */
    int         weight;         /* weight of first digit */
-   int         rscale;         /* result scale */
-   int         dscale;         /* display scale */
    int         sign;           /* NUMERIC_POS, NUMERIC_NEG, or
                                 * NUMERIC_NAN */
+   int         dscale;         /* display scale */
    NumericDigit *buf;          /* start of palloc'd space for digits[] */
-   NumericDigit *digits;       /* decimal digits */
+   NumericDigit *digits;       /* base-NBASE digits */
 } NumericVar;
 
 
 /* ----------
- * Local data
- * ----------
- */
-static int global_rscale = 0;
-
-/* ----------
- * Some preinitialized variables we need often
+ * Some preinitialized constants
  * ----------
  */
 static NumericDigit const_zero_data[1] = {0};
 static NumericVar const_zero =
-{0, 0, 0, 0, NUMERIC_POS, NULL, const_zero_data};
+{0, 0, NUMERIC_POS, 0, NULL, const_zero_data};
 
 static NumericDigit const_one_data[1] = {1};
 static NumericVar const_one =
-{1, 0, 0, 0, NUMERIC_POS, NULL, const_one_data};
+{1, 0, NUMERIC_POS, 0, NULL, const_one_data};
 
 static NumericDigit const_two_data[1] = {2};
 static NumericVar const_two =
-{1, 0, 0, 0, NUMERIC_POS, NULL, const_two_data};
-
-static NumericDigit const_zero_point_one_data[1] = {1};
-static NumericVar const_zero_point_one =
-{1, -1, 1, 1, NUMERIC_POS, NULL, const_zero_point_one_data};
-
+{1, 0, NUMERIC_POS, 0, NULL, const_two_data};
+
+#if DEC_DIGITS == 4
+static NumericDigit const_zero_point_five_data[1] = {5000};
+#elif DEC_DIGITS == 2
+static NumericDigit const_zero_point_five_data[1] = {50};
+#elif DEC_DIGITS == 1
+static NumericDigit const_zero_point_five_data[1] = {5};
+#endif
+static NumericVar const_zero_point_five =
+{1, -1, NUMERIC_POS, 1, NULL, const_zero_point_five_data};
+
+#if DEC_DIGITS == 4
+static NumericDigit const_zero_point_nine_data[1] = {9000};
+#elif DEC_DIGITS == 2
+static NumericDigit const_zero_point_nine_data[1] = {90};
+#elif DEC_DIGITS == 1
 static NumericDigit const_zero_point_nine_data[1] = {9};
+#endif
 static NumericVar const_zero_point_nine =
-{1, -1, 1, 1, NUMERIC_POS, NULL, const_zero_point_nine_data};
+{1, -1, NUMERIC_POS, 1, NULL, const_zero_point_nine_data};
+
+#if DEC_DIGITS == 4
+static NumericDigit const_zero_point_01_data[1] = {100};
+static NumericVar const_zero_point_01 =
+{1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
+#elif DEC_DIGITS == 2
+static NumericDigit const_zero_point_01_data[1] = {1};
+static NumericVar const_zero_point_01 =
+{1, -1, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
+#elif DEC_DIGITS == 1
+static NumericDigit const_zero_point_01_data[1] = {1};
+static NumericVar const_zero_point_01 =
+{1, -2, NUMERIC_POS, 2, NULL, const_zero_point_01_data};
+#endif
 
+#if DEC_DIGITS == 4
+static NumericDigit const_one_point_one_data[2] = {1, 1000};
+#elif DEC_DIGITS == 2
+static NumericDigit const_one_point_one_data[2] = {1, 10};
+#elif DEC_DIGITS == 1
 static NumericDigit const_one_point_one_data[2] = {1, 1};
+#endif
 static NumericVar const_one_point_one =
-{2, 0, 1, 1, NUMERIC_POS, NULL, const_one_point_one_data};
+{2, 0, NUMERIC_POS, 1, NULL, const_one_point_one_data};
 
 static NumericVar const_nan =
-{0, 0, 0, 0, NUMERIC_NAN, NULL, NULL};
+{0, 0, NUMERIC_NAN, 0, NULL, NULL};
 
+#if DEC_DIGITS == 4
+static const int   round_powers[4] = { 0, 1000, 100, 10 };
+#endif
 
 
 /* ----------
@@ -129,27 +221,29 @@ static NumericVar const_nan =
  */
 
 #ifdef NUMERIC_DEBUG
-static void dump_numeric(char *str, Numeric num);
-static void dump_var(char *str, NumericVar *var);
+static void dump_numeric(const char *str, Numeric num);
+static void dump_var(const char *str, NumericVar *var);
 
 #else
 #define dump_numeric(s,n)
 #define dump_var(s,v)
 #endif
 
-#define digitbuf_alloc(size)  ((NumericDigit *) palloc(size))
+#define digitbuf_alloc(ndigits)  \
+   ((NumericDigit *) palloc((ndigits) * sizeof(NumericDigit)))
 #define digitbuf_free(buf) \
    do { \
         if ((buf) != NULL) \
             pfree(buf); \
    } while (0)
 
-#define init_var(v)        memset(v,0,sizeof(NumericVar))
+#define init_var(v)        MemSetAligned(v, 0, sizeof(NumericVar))
+
 static void alloc_var(NumericVar *var, int ndigits);
 static void free_var(NumericVar *var);
 static void zero_var(NumericVar *var);
 
-static void set_var_from_str(char *str, NumericVar *dest);
+static void set_var_from_str(const char *str, NumericVar *dest);
 static void set_var_from_num(Numeric value, NumericVar *dest);
 static void set_var_from_var(NumericVar *value, NumericVar *dest);
 static char *get_str_from_var(NumericVar *var, int dscale);
@@ -158,6 +252,8 @@ static Numeric make_result(NumericVar *var);
 
 static void apply_typmod(NumericVar *var, int32 typmod);
 
+static bool numericvar_to_int8(NumericVar *var, int64 *result);
+static void int8_to_numericvar(int64 val, NumericVar *var);
 static double numeric_to_double_no_overflow(Numeric num);
 static double numericvar_to_double_no_overflow(NumericVar *var);
 
@@ -165,23 +261,30 @@ static int    cmp_numerics(Numeric num1, Numeric num2);
 static int cmp_var(NumericVar *var1, NumericVar *var2);
 static void add_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
 static void sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
-static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
-static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
+static void mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
+                   int rscale);
+static void div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
+                   int rscale);
 static int select_div_scale(NumericVar *var1, NumericVar *var2);
 static void mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result);
 static void ceil_var(NumericVar *var, NumericVar *result);
 static void floor_var(NumericVar *var, NumericVar *result);
 
-static void sqrt_var(NumericVar *arg, NumericVar *result);
-static void exp_var(NumericVar *arg, NumericVar *result);
-static void ln_var(NumericVar *arg, NumericVar *result);
+static void sqrt_var(NumericVar *arg, NumericVar *result, int rscale);
+static void exp_var(NumericVar *arg, NumericVar *result, int rscale);
+static void exp_var_internal(NumericVar *arg, NumericVar *result, int rscale);
+static void ln_var(NumericVar *arg, NumericVar *result, int rscale);
 static void log_var(NumericVar *base, NumericVar *num, NumericVar *result);
 static void power_var(NumericVar *base, NumericVar *exp, NumericVar *result);
+static void power_var_int(NumericVar *base, int exp, NumericVar *result,
+                         int rscale);
 
 static int cmp_abs(NumericVar *var1, NumericVar *var2);
 static void add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
 static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
-
+static void round_var(NumericVar *var, int rscale);
+static void trunc_var(NumericVar *var, int rscale);
+static void strip_var(NumericVar *var);
 
 
 /* ----------------------------------------------------------------------
@@ -192,11 +295,10 @@ static void sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result);
  */
 
 
-/* ----------
+/*
  * numeric_in() -
  *
  * Input function for numeric data type
- * ----------
  */
 Datum
 numeric_in(PG_FUNCTION_ARGS)
@@ -232,11 +334,10 @@ numeric_in(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_out() -
  *
  * Output function for numeric data type
- * ----------
  */
 Datum
 numeric_out(PG_FUNCTION_ARGS)
@@ -270,13 +371,12 @@ numeric_out(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric() -
  *
  * This is a special function called by the Postgres database system
- * before a value is stored in a tuples attribute. The precision and
+ * before a value is stored in a tuple's attribute. The precision and
  * scale of the attribute have to be applied on the value.
- * ----------
  */
 Datum
 numeric(PG_FUNCTION_ARGS)
@@ -287,7 +387,8 @@ numeric(PG_FUNCTION_ARGS)
    int32       tmp_typmod;
    int         precision;
    int         scale;
-   int         maxweight;
+   int         ddigits;
+   int         maxdigits;
    NumericVar  var;
 
    /*
@@ -313,18 +414,19 @@ numeric(PG_FUNCTION_ARGS)
    tmp_typmod = typmod - VARHDRSZ;
    precision = (tmp_typmod >> 16) & 0xffff;
    scale = tmp_typmod & 0xffff;
-   maxweight = precision - scale;
+   maxdigits = precision - scale;
 
    /*
-    * If the number is in bounds and due to the present result scale no
+    * If the number is certainly in bounds and due to the target scale no
     * rounding could be necessary, just make a copy of the input and
-    * modify its scale fields.
+    * modify its scale fields.  (Note we assume the existing dscale is
+    * honest...)
     */
-   if (num->n_weight < maxweight && scale >= num->n_rscale)
+   ddigits = (num->n_weight + 1) * DEC_DIGITS;
+   if (ddigits <= maxdigits && scale >= NUMERIC_DSCALE(num))
    {
        new = (Numeric) palloc(num->varlen);
        memcpy(new, num, num->varlen);
-       new->n_rscale = scale;
        new->n_sign_dscale = NUMERIC_SIGN(new) |
            ((uint16) scale & NUMERIC_DSCALE_MASK);
        PG_RETURN_NUMERIC(new);
@@ -425,12 +527,11 @@ numeric_uplus(PG_FUNCTION_ARGS)
    PG_RETURN_NUMERIC(res);
 }
 
-/* ----------
+/*
  * numeric_sign() -
  *
  * returns -1 if the argument is less than 0, 0 if the argument is equal
  * to 0, and 1 if the argument is greater than zero.
- * ----------
  */
 Datum
 numeric_sign(PG_FUNCTION_ARGS)
@@ -471,13 +572,12 @@ numeric_sign(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_round() -
  *
  * Round a value to have 'scale' digits after the decimal point.
  * We allow negative 'scale', implying rounding before the decimal
  * point --- Oracle interprets rounding that way.
- * ----------
  */
 Datum
 numeric_round(PG_FUNCTION_ARGS)
@@ -486,7 +586,6 @@ numeric_round(PG_FUNCTION_ARGS)
    int32       scale = PG_GETARG_INT32(1);
    Numeric     res;
    NumericVar  arg;
-   int         i;
 
    /*
     * Handle NaN
@@ -496,7 +595,6 @@ numeric_round(PG_FUNCTION_ARGS)
 
    /*
     * Limit the scale value to avoid possible overflow in calculations
-    * below.
     */
    scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
    scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
@@ -507,46 +605,11 @@ numeric_round(PG_FUNCTION_ARGS)
    init_var(&arg);
    set_var_from_num(num, &arg);
 
-   i = arg.weight + scale + 1;
-
-   if (i < arg.ndigits)
-   {
-       /*
-        * If i = 0, the value loses all digits, but could round up if its
-        * first digit is more than 4.  If i < 0 the result must be 0.
-        */
-       if (i < 0)
-           arg.ndigits = 0;
-       else
-       {
-           int         carry = (arg.digits[i] > 4) ? 1 : 0;
-
-           arg.ndigits = i;
-
-           while (carry)
-           {
-               carry += arg.digits[--i];
-               arg.digits[i] = carry % 10;
-               carry /= 10;
-           }
-
-           if (i < 0)
-           {
-               Assert(i == -1);    /* better not have added more than 1 digit */
-               Assert(arg.digits > arg.buf);
-               arg.digits--;
-               arg.ndigits++;
-               arg.weight++;
-           }
-       }
-   }
+   round_var(&arg, scale);
 
-   /*
-    * Set result's scale to something reasonable.
-    */
-   scale = Min(NUMERIC_MAX_DISPLAY_SCALE, Max(0, scale));
-   arg.rscale = scale;
-   arg.dscale = scale;
+   /* We don't allow negative output dscale */
+   if (scale < 0)
+       arg.dscale = 0;
 
    /*
     * Return the rounded result
@@ -558,13 +621,12 @@ numeric_round(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_trunc() -
  *
  * Truncate a value to have 'scale' digits after the decimal point.
  * We allow negative 'scale', implying a truncation before the decimal
  * point --- Oracle interprets truncation that way.
- * ----------
  */
 Datum
 numeric_trunc(PG_FUNCTION_ARGS)
@@ -582,7 +644,6 @@ numeric_trunc(PG_FUNCTION_ARGS)
 
    /*
     * Limit the scale value to avoid possible overflow in calculations
-    * below.
     */
    scale = Max(scale, -NUMERIC_MAX_RESULT_SCALE);
    scale = Min(scale, NUMERIC_MAX_RESULT_SCALE);
@@ -593,14 +654,11 @@ numeric_trunc(PG_FUNCTION_ARGS)
    init_var(&arg);
    set_var_from_num(num, &arg);
 
-   arg.ndigits = Min(arg.ndigits, Max(0, arg.weight + scale + 1));
+   trunc_var(&arg, scale);
 
-   /*
-    * Set result's scale to something reasonable.
-    */
-   scale = Min(NUMERIC_MAX_DISPLAY_SCALE, Max(0, scale));
-   arg.rscale = scale;
-   arg.dscale = scale;
+   /* We don't allow negative output dscale */
+   if (scale < 0)
+       arg.dscale = 0;
 
    /*
     * Return the truncated result
@@ -612,11 +670,10 @@ numeric_trunc(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_ceil() -
  *
  * Return the smallest integer greater than or equal to the argument
- * ----------
  */
 Datum
 numeric_ceil(PG_FUNCTION_ARGS)
@@ -633,8 +690,6 @@ numeric_ceil(PG_FUNCTION_ARGS)
    set_var_from_num(num, &result);
    ceil_var(&result, &result);
 
-   result.dscale = 0;
-
    res = make_result(&result);
    free_var(&result);
 
@@ -642,11 +697,10 @@ numeric_ceil(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_floor() -
  *
  * Return the largest integer equal to or less than the argument
- * ----------
  */
 Datum
 numeric_floor(PG_FUNCTION_ARGS)
@@ -663,8 +717,6 @@ numeric_floor(PG_FUNCTION_ARGS)
    set_var_from_num(num, &result);
    floor_var(&result, &result);
 
-   result.dscale = 0;
-
    res = make_result(&result);
    free_var(&result);
 
@@ -833,17 +885,16 @@ cmp_numerics(Numeric num1, Numeric num2)
 
 /* ----------------------------------------------------------------------
  *
- * Arithmetic base functions
+ * Basic arithmetic functions
  *
  * ----------------------------------------------------------------------
  */
 
 
-/* ----------
+/*
  * numeric_add() -
  *
  * Add two numerics
- * ----------
  */
 Datum
 numeric_add(PG_FUNCTION_ARGS)
@@ -863,8 +914,6 @@ numeric_add(PG_FUNCTION_ARGS)
 
    /*
     * Unpack the values, let add_var() compute the result and return it.
-    * The internals of add_var() will automatically set the correct
-    * result and display scales in the result.
     */
    init_var(&arg1);
    init_var(&arg2);
@@ -874,6 +923,7 @@ numeric_add(PG_FUNCTION_ARGS)
    set_var_from_num(num2, &arg2);
 
    add_var(&arg1, &arg2, &result);
+
    res = make_result(&result);
 
    free_var(&arg1);
@@ -884,11 +934,10 @@ numeric_add(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_sub() -
  *
  * Subtract one numeric from another
- * ----------
  */
 Datum
 numeric_sub(PG_FUNCTION_ARGS)
@@ -907,8 +956,7 @@ numeric_sub(PG_FUNCTION_ARGS)
        PG_RETURN_NUMERIC(make_result(&const_nan));
 
    /*
-    * Unpack the two arguments, let sub_var() compute the result and
-    * return it.
+    * Unpack the values, let sub_var() compute the result and return it.
     */
    init_var(&arg1);
    init_var(&arg2);
@@ -918,6 +966,7 @@ numeric_sub(PG_FUNCTION_ARGS)
    set_var_from_num(num2, &arg2);
 
    sub_var(&arg1, &arg2, &result);
+
    res = make_result(&result);
 
    free_var(&arg1);
@@ -928,11 +977,10 @@ numeric_sub(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_mul() -
  *
  * Calculate the product of two numerics
- * ----------
  */
 Datum
 numeric_mul(PG_FUNCTION_ARGS)
@@ -951,12 +999,11 @@ numeric_mul(PG_FUNCTION_ARGS)
        PG_RETURN_NUMERIC(make_result(&const_nan));
 
    /*
-    * Unpack the arguments, let mul_var() compute the result and return
-    * it. Unlike add_var() and sub_var(), mul_var() will round the result
-    * to the scale stored in global_rscale. In the case of numeric_mul(),
-    * which is invoked for the * operator on numerics, we set it to the
-    * exact representation for the product (rscale = sum(rscale of arg1,
-    * rscale of arg2) and the same for the dscale).
+    * Unpack the values, let mul_var() compute the result and return it.
+    * Unlike add_var() and sub_var(), mul_var() will round its result.
+    * In the case of numeric_mul(), which is invoked for the * operator on
+    * numerics, we request exact representation for the product (rscale =
+    * sum(dscale of arg1, dscale of arg2)).
     */
    init_var(&arg1);
    init_var(&arg2);
@@ -965,11 +1012,7 @@ numeric_mul(PG_FUNCTION_ARGS)
    set_var_from_num(num1, &arg1);
    set_var_from_num(num2, &arg2);
 
-   global_rscale = arg1.rscale + arg2.rscale;
-
-   mul_var(&arg1, &arg2, &result);
-
-   result.dscale = arg1.dscale + arg2.dscale;
+   mul_var(&arg1, &arg2, &result, arg1.dscale + arg2.dscale);
 
    res = make_result(&result);
 
@@ -981,11 +1024,10 @@ numeric_mul(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_div() -
  *
  * Divide one numeric into another
- * ----------
  */
 Datum
 numeric_div(PG_FUNCTION_ARGS)
@@ -996,7 +1038,7 @@ numeric_div(PG_FUNCTION_ARGS)
    NumericVar  arg2;
    NumericVar  result;
    Numeric     res;
-   int         res_dscale;
+   int         rscale;
 
    /*
     * Handle NaN
@@ -1014,14 +1056,15 @@ numeric_div(PG_FUNCTION_ARGS)
    set_var_from_num(num1, &arg1);
    set_var_from_num(num2, &arg2);
 
-   res_dscale = select_div_scale(&arg1, &arg2);
-
    /*
-    * Do the divide, set the display scale and return the result
+    * Select scale for division result
     */
-   div_var(&arg1, &arg2, &result);
+   rscale = select_div_scale(&arg1, &arg2);
 
-   result.dscale = res_dscale;
+   /*
+    * Do the divide and return the result
+    */
+   div_var(&arg1, &arg2, &result, rscale);
 
    res = make_result(&result);
 
@@ -1033,11 +1076,10 @@ numeric_div(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_mod() -
  *
  * Calculate the modulo of two numerics
- * ----------
  */
 Datum
 numeric_mod(PG_FUNCTION_ARGS)
@@ -1071,11 +1113,10 @@ numeric_mod(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_inc() -
  *
  * Increment a number by one
- * ----------
  */
 Datum
 numeric_inc(PG_FUNCTION_ARGS)
@@ -1098,6 +1139,7 @@ numeric_inc(PG_FUNCTION_ARGS)
    set_var_from_num(num, &arg);
 
    add_var(&arg, &const_one, &arg);
+
    res = make_result(&arg);
 
    free_var(&arg);
@@ -1106,11 +1148,10 @@ numeric_inc(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_smaller() -
  *
  * Return the smaller of two numbers
- * ----------
  */
 Datum
 numeric_smaller(PG_FUNCTION_ARGS)
@@ -1148,11 +1189,10 @@ numeric_smaller(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_larger() -
  *
  * Return the larger of two numbers
- * ----------
  */
 Datum
 numeric_larger(PG_FUNCTION_ARGS)
@@ -1192,17 +1232,16 @@ numeric_larger(PG_FUNCTION_ARGS)
 
 /* ----------------------------------------------------------------------
  *
- * Complex math functions
+ * Advanced math functions
  *
  * ----------------------------------------------------------------------
  */
 
 
-/* ----------
+/*
  * numeric_sqrt() -
  *
  * Compute the square root of a numeric.
- * ----------
  */
 Datum
 numeric_sqrt(PG_FUNCTION_ARGS)
@@ -1212,7 +1251,7 @@ numeric_sqrt(PG_FUNCTION_ARGS)
    NumericVar  arg;
    NumericVar  result;
    int         sweight;
-   int         res_dscale;
+   int         rscale;
 
    /*
     * Handle NaN
@@ -1221,7 +1260,7 @@ numeric_sqrt(PG_FUNCTION_ARGS)
        PG_RETURN_NUMERIC(make_result(&const_nan));
 
    /*
-    * Unpack the argument and determine the scales.  We choose a display
+    * Unpack the argument and determine the result scale.  We choose a
     * scale to give at least NUMERIC_MIN_SIG_DIGITS significant digits;
     * but in any case not less than the input's dscale.
     */
@@ -1231,21 +1270,17 @@ numeric_sqrt(PG_FUNCTION_ARGS)
    set_var_from_num(num, &arg);
 
    /* Assume the input was normalized, so arg.weight is accurate */
-   sweight = (arg.weight / 2) - 1;
-
-   res_dscale = NUMERIC_MIN_SIG_DIGITS - sweight;
-   res_dscale = Max(res_dscale, arg.dscale);
-   res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE);
-   res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
+   sweight = (arg.weight + 1) * DEC_DIGITS / 2 - 1;
 
-   global_rscale = res_dscale + 8;
+   rscale = NUMERIC_MIN_SIG_DIGITS - sweight;
+   rscale = Max(rscale, arg.dscale);
+   rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+   rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
 
    /*
     * Let sqrt_var() do the calculation and return the result.
     */
-   sqrt_var(&arg, &result);
-
-   result.dscale = res_dscale;
+   sqrt_var(&arg, &result, rscale);
 
    res = make_result(&result);
 
@@ -1256,11 +1291,10 @@ numeric_sqrt(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_exp() -
  *
  * Raise e to the power of x
- * ----------
  */
 Datum
 numeric_exp(PG_FUNCTION_ARGS)
@@ -1269,7 +1303,7 @@ numeric_exp(PG_FUNCTION_ARGS)
    Numeric     res;
    NumericVar  arg;
    NumericVar  result;
-   int         res_dscale;
+   int         rscale;
    double      val;
 
    /*
@@ -1279,7 +1313,7 @@ numeric_exp(PG_FUNCTION_ARGS)
        PG_RETURN_NUMERIC(make_result(&const_nan));
 
    /*
-    * Unpack the argument and determine the scales.  We choose a display
+    * Unpack the argument and determine the result scale.  We choose a
     * scale to give at least NUMERIC_MIN_SIG_DIGITS significant digits;
     * but in any case not less than the input's dscale.
     */
@@ -1289,28 +1323,27 @@ numeric_exp(PG_FUNCTION_ARGS)
    set_var_from_num(num, &arg);
 
    /* convert input to float8, ignoring overflow */
-   val = numeric_to_double_no_overflow(num);
+   val = numericvar_to_double_no_overflow(&arg);
 
-   /* log10(result) = num * log10(e), so this is approximately the weight: */
+   /*
+    * log10(result) = num * log10(e), so this is approximately the decimal
+    * weight of the result:
+    */
    val *= 0.434294481903252;
 
    /* limit to something that won't cause integer overflow */
    val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
    val = Min(val, NUMERIC_MAX_RESULT_SCALE);
 
-   res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
-   res_dscale = Max(res_dscale, arg.dscale);
-   res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE);
-   res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
-
-   global_rscale = res_dscale + NUMERIC_EXTRA_DIGITS;
+   rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
+   rscale = Max(rscale, arg.dscale);
+   rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+   rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
 
    /*
     * Let exp_var() do the calculation and return the result.
     */
-   exp_var(&arg, &result);
-
-   result.dscale = res_dscale;
+   exp_var(&arg, &result, rscale);
 
    res = make_result(&result);
 
@@ -1321,11 +1354,10 @@ numeric_exp(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_ln() -
  *
  * Compute the natural logarithm of x
- * ----------
  */
 Datum
 numeric_ln(PG_FUNCTION_ARGS)
@@ -1334,7 +1366,8 @@ numeric_ln(PG_FUNCTION_ARGS)
    Numeric     res;
    NumericVar  arg;
    NumericVar  result;
-   int         res_dscale;
+   int         dec_digits;
+   int         rscale;
 
    /*
     * Handle NaN
@@ -1347,22 +1380,21 @@ numeric_ln(PG_FUNCTION_ARGS)
 
    set_var_from_num(num, &arg);
 
-   if (arg.weight > 0)
-       res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(arg.weight);
-   else if (arg.weight < 0)
-       res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(- arg.weight);
-   else
-       res_dscale = NUMERIC_MIN_SIG_DIGITS;
-
-   res_dscale = Max(res_dscale, arg.dscale);
-   res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE);
-   res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
+   /* Approx decimal digits before decimal point */
+   dec_digits = (arg.weight + 1) * DEC_DIGITS;
 
-   global_rscale = res_dscale + NUMERIC_EXTRA_DIGITS;
+   if (dec_digits > 1)
+       rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
+   else if (dec_digits < 1)
+       rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
+   else
+       rscale = NUMERIC_MIN_SIG_DIGITS;
 
-   ln_var(&arg, &result);
+   rscale = Max(rscale, arg.dscale);
+   rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+   rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
 
-   result.dscale = res_dscale;
+   ln_var(&arg, &result, rscale);
 
    res = make_result(&result);
 
@@ -1373,11 +1405,10 @@ numeric_ln(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_log() -
  *
  * Compute the logarithm of x in a given base
- * ----------
  */
 Datum
 numeric_log(PG_FUNCTION_ARGS)
@@ -1407,7 +1438,7 @@ numeric_log(PG_FUNCTION_ARGS)
 
    /*
     * Call log_var() to compute and return the result; note it handles
-    * rscale/dscale itself.
+    * scale selection itself.
     */
    log_var(&arg1, &arg2, &result);
 
@@ -1421,11 +1452,10 @@ numeric_log(PG_FUNCTION_ARGS)
 }
 
 
-/* ----------
+/*
  * numeric_power() -
  *
  * Raise b to the power of x
- * ----------
  */
 Datum
 numeric_power(PG_FUNCTION_ARGS)
@@ -1455,7 +1485,7 @@ numeric_power(PG_FUNCTION_ARGS)
 
    /*
     * Call power_var() to compute and return the result; note it handles
-    * rscale/dscale itself.
+    * scale selection itself.
     */
    power_var(&arg1, &arg2, &result);
 
@@ -1483,17 +1513,14 @@ int4_numeric(PG_FUNCTION_ARGS)
    int32       val = PG_GETARG_INT32(0);
    Numeric     res;
    NumericVar  result;
-   char       *tmp;
 
    init_var(&result);
 
-   tmp = DatumGetCString(DirectFunctionCall1(int4out,
-                                             Int32GetDatum(val)));
-   set_var_from_str(tmp, &result);
+   int8_to_numericvar((int64) val, &result);
+
    res = make_result(&result);
 
    free_var(&result);
-   pfree(tmp);
 
    PG_RETURN_NUMERIC(res);
 }
@@ -1504,46 +1531,47 @@ numeric_int4(PG_FUNCTION_ARGS)
 {
    Numeric     num = PG_GETARG_NUMERIC(0);
    NumericVar  x;
-   char       *str;
-   Datum       result;
+   int64       val;
+   int32       result;
 
    /* XXX would it be better to return NULL? */
    if (NUMERIC_IS_NAN(num))
        elog(ERROR, "Cannot convert NaN to int4");
 
-   /*
-    * Get the number in the variable format so we can round to integer.
-    */
+   /* Convert to variable format and thence to int8 */
    init_var(&x);
    set_var_from_num(num, &x);
 
-   str = get_str_from_var(&x, 0);      /* dscale = 0 produces rounding */
+   if (!numericvar_to_int8(&x, &val))
+       elog(ERROR, "numeric conversion to int4 is out of range");
 
    free_var(&x);
 
-   result = DirectFunctionCall1(int4in, CStringGetDatum(str));
-   pfree(str);
+   /* Down-convert to int4 */
+   result = (int32) val;
 
-   PG_RETURN_DATUM(result);
+   /* Test for overflow by reverse-conversion. */
+   if ((int64) result != val)
+       elog(ERROR, "numeric conversion to int4 is out of range");
+
+   PG_RETURN_INT32(result);
 }
 
 
 Datum
 int8_numeric(PG_FUNCTION_ARGS)
 {
-   Datum       val = PG_GETARG_DATUM(0);
+   int64       val = PG_GETARG_INT64(0);
    Numeric     res;
    NumericVar  result;
-   char       *tmp;
 
    init_var(&result);
 
-   tmp = DatumGetCString(DirectFunctionCall1(int8out, val));
-   set_var_from_str(tmp, &result);
+   int8_to_numericvar(val, &result);
+
    res = make_result(&result);
 
    free_var(&result);
-   pfree(tmp);
 
    PG_RETURN_NUMERIC(res);
 }
@@ -1554,27 +1582,22 @@ numeric_int8(PG_FUNCTION_ARGS)
 {
    Numeric     num = PG_GETARG_NUMERIC(0);
    NumericVar  x;
-   char       *str;
-   Datum       result;
+   int64       result;
 
    /* XXX would it be better to return NULL? */
    if (NUMERIC_IS_NAN(num))
        elog(ERROR, "Cannot convert NaN to int8");
 
-   /*
-    * Get the number in the variable format so we can round to integer.
-    */
+   /* Convert to variable format and thence to int8 */
    init_var(&x);
    set_var_from_num(num, &x);
 
-   str = get_str_from_var(&x, 0);      /* dscale = 0 produces rounding */
+   if (!numericvar_to_int8(&x, &result))
+       elog(ERROR, "numeric conversion to int8 is out of range");
 
    free_var(&x);
 
-   result = DirectFunctionCall1(int8in, CStringGetDatum(str));
-   pfree(str);
-
-   PG_RETURN_DATUM(result);
+   PG_RETURN_INT64(result);
 }
 
 
@@ -1584,17 +1607,14 @@ int2_numeric(PG_FUNCTION_ARGS)
    int16       val = PG_GETARG_INT16(0);
    Numeric     res;
    NumericVar  result;
-   char       *tmp;
 
    init_var(&result);
 
-   tmp = DatumGetCString(DirectFunctionCall1(int2out,
-                                             Int16GetDatum(val)));
-   set_var_from_str(tmp, &result);
+   int8_to_numericvar((int64) val, &result);
+
    res = make_result(&result);
 
    free_var(&result);
-   pfree(tmp);
 
    PG_RETURN_NUMERIC(res);
 }
@@ -1605,27 +1625,30 @@ numeric_int2(PG_FUNCTION_ARGS)
 {
    Numeric     num = PG_GETARG_NUMERIC(0);
    NumericVar  x;
-   char       *str;
-   Datum       result;
+   int64       val;
+   int16       result;
 
    /* XXX would it be better to return NULL? */
    if (NUMERIC_IS_NAN(num))
        elog(ERROR, "Cannot convert NaN to int2");
 
-   /*
-    * Get the number in the variable format so we can round to integer.
-    */
+   /* Convert to variable format and thence to int8 */
    init_var(&x);
    set_var_from_num(num, &x);
 
-   str = get_str_from_var(&x, 0);      /* dscale = 0 produces rounding */
+   if (!numericvar_to_int8(&x, &val))
+       elog(ERROR, "numeric conversion to int2 is out of range");
 
    free_var(&x);
 
-   result = DirectFunctionCall1(int2in, CStringGetDatum(str));
-   pfree(str);
+   /* Down-convert to int2 */
+   result = (int16) val;
 
-   return result;
+   /* Test for overflow by reverse-conversion. */
+   if ((int64) result != val)
+       elog(ERROR, "numeric conversion to int2 is out of range");
+
+   PG_RETURN_INT16(result);
 }
 
 
@@ -1926,7 +1949,7 @@ numeric_variance(PG_FUNCTION_ARGS)
                vsumX,
                vsumX2,
                vNminus1;
-   int         div_dscale;
+   int         rscale;
 
    /* We assume the input is array of numeric */
    deconstruct_array(transarray,
@@ -1964,11 +1987,11 @@ numeric_variance(PG_FUNCTION_ARGS)
    init_var(&vsumX2);
    set_var_from_num(sumX2, &vsumX2);
 
-   /* set rscale for mul_var calls */
-   global_rscale = vsumX.rscale * 2;
+   /* compute rscale for mul_var calls */
+   rscale = vsumX.dscale * 2;
 
-   mul_var(&vsumX, &vsumX, &vsumX);    /* now vsumX contains sumX * sumX */
-   mul_var(&vN, &vsumX2, &vsumX2);     /* now vsumX2 contains N * sumX2 */
+   mul_var(&vsumX, &vsumX, &vsumX, rscale);    /* vsumX = sumX * sumX */
+   mul_var(&vN, &vsumX2, &vsumX2, rscale);     /* vsumX2 = N * sumX2 */
    sub_var(&vsumX2, &vsumX, &vsumX2);  /* N * sumX2 - sumX * sumX */
 
    if (cmp_var(&vsumX2, &const_zero) <= 0)
@@ -1978,10 +2001,9 @@ numeric_variance(PG_FUNCTION_ARGS)
    }
    else
    {
-       mul_var(&vN, &vNminus1, &vNminus1);     /* N * (N - 1) */
-       div_dscale = select_div_scale(&vsumX2, &vNminus1);
-       div_var(&vsumX2, &vNminus1, &vsumX);    /* variance */
-       vsumX.dscale = div_dscale;
+       mul_var(&vN, &vNminus1, &vNminus1, 0);      /* N * (N - 1) */
+       rscale = select_div_scale(&vsumX2, &vNminus1);
+       div_var(&vsumX2, &vNminus1, &vsumX, rscale);    /* variance */
 
        res = make_result(&vsumX);
    }
@@ -2008,7 +2030,7 @@ numeric_stddev(PG_FUNCTION_ARGS)
                vsumX,
                vsumX2,
                vNminus1;
-   int         div_dscale;
+   int         rscale;
 
    /* We assume the input is array of numeric */
    deconstruct_array(transarray,
@@ -2046,11 +2068,11 @@ numeric_stddev(PG_FUNCTION_ARGS)
    init_var(&vsumX2);
    set_var_from_num(sumX2, &vsumX2);
 
-   /* set rscale for mul_var calls */
-   global_rscale = vsumX.rscale * 2;
+   /* compute rscale for mul_var calls */
+   rscale = vsumX.dscale * 2;
 
-   mul_var(&vsumX, &vsumX, &vsumX);    /* now vsumX contains sumX * sumX */
-   mul_var(&vN, &vsumX2, &vsumX2);     /* now vsumX2 contains N * sumX2 */
+   mul_var(&vsumX, &vsumX, &vsumX, rscale);    /* vsumX = sumX * sumX */
+   mul_var(&vN, &vsumX2, &vsumX2, rscale);     /* vsumX2 = N * sumX2 */
    sub_var(&vsumX2, &vsumX, &vsumX2);  /* N * sumX2 - sumX * sumX */
 
    if (cmp_var(&vsumX2, &const_zero) <= 0)
@@ -2060,11 +2082,10 @@ numeric_stddev(PG_FUNCTION_ARGS)
    }
    else
    {
-       mul_var(&vN, &vNminus1, &vNminus1);     /* N * (N - 1) */
-       div_dscale = select_div_scale(&vsumX2, &vNminus1);
-       div_var(&vsumX2, &vNminus1, &vsumX);    /* variance */
-       vsumX.dscale = div_dscale;
-       sqrt_var(&vsumX, &vsumX);       /* stddev */
+       mul_var(&vN, &vNminus1, &vNminus1, 0);      /* N * (N - 1) */
+       rscale = select_div_scale(&vsumX2, &vNminus1);
+       div_var(&vsumX2, &vNminus1, &vsumX, rscale);    /* variance */
+       sqrt_var(&vsumX, &vsumX, rscale);       /* stddev */
 
        res = make_result(&vsumX);
    }
@@ -2267,25 +2288,26 @@ int8_avg(PG_FUNCTION_ARGS)
 
 /* ----------------------------------------------------------------------
  *
- * Local functions follow
+ * Debug support
  *
  * ----------------------------------------------------------------------
  */
 
-
 #ifdef NUMERIC_DEBUG
 
-/* ----------
+/*
  * dump_numeric() - Dump a value in the db storage format for debugging
- * ----------
  */
 static void
-dump_numeric(char *str, Numeric num)
+dump_numeric(const char *str, Numeric num)
 {
+   NumericDigit *digits = (NumericDigit *) num->n_data;
+   int         ndigits;
    int         i;
 
-   printf("%s: NUMERIC w=%d r=%d d=%d ", str, num->n_weight, num->n_rscale,
-          NUMERIC_DSCALE(num));
+   ndigits = (num->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit);
+
+   printf("%s: NUMERIC w=%d d=%d ", str, num->n_weight, NUMERIC_DSCALE(num));
    switch (NUMERIC_SIGN(num))
    {
        case NUMERIC_POS:
@@ -2302,23 +2324,21 @@ dump_numeric(char *str, Numeric num)
            break;
    }
 
-   for (i = 0; i < num->varlen - NUMERIC_HDRSZ; i++)
-       printf(" %d %d", (num->n_data[i] >> 4) & 0x0f, num->n_data[i] & 0x0f);
+   for (i = 0; i < ndigits; i++)
+       printf(" %0*d", DEC_DIGITS, digits[i]);
    printf("\n");
 }
 
 
-/* ----------
+/*
  * dump_var() - Dump a value in the variable format for debugging
- * ----------
  */
 static void
-dump_var(char *str, NumericVar *var)
+dump_var(const char *str, NumericVar *var)
 {
    int         i;
 
-   printf("%s: VAR w=%d r=%d d=%d ", str, var->weight, var->rscale,
-          var->dscale);
+   printf("%s: VAR w=%d d=%d ", str, var->weight, var->dscale);
    switch (var->sign)
    {
        case NUMERIC_POS:
@@ -2336,35 +2356,45 @@ dump_var(char *str, NumericVar *var)
    }
 
    for (i = 0; i < var->ndigits; i++)
-       printf(" %d", var->digits[i]);
+       printf(" %0*d", DEC_DIGITS, var->digits[i]);
 
    printf("\n");
 }
+
 #endif   /* NUMERIC_DEBUG */
 
 
-/* ----------
+/* ----------------------------------------------------------------------
+ *
+ * Local functions follow
+ *
+ * In general, these do not support NaNs --- callers must eliminate
+ * the possibility of NaN first.  (make_result() is an exception.)
+ *
+ * ----------------------------------------------------------------------
+ */
+
+
+/*
  * alloc_var() -
  *
  * Allocate a digit buffer of ndigits digits (plus a spare digit for rounding)
- * ----------
  */
 static void
 alloc_var(NumericVar *var, int ndigits)
 {
    digitbuf_free(var->buf);
    var->buf = digitbuf_alloc(ndigits + 1);
-   var->buf[0] = 0;
+   var->buf[0] = 0;                /* spare digit for rounding */
    var->digits = var->buf + 1;
    var->ndigits = ndigits;
 }
 
 
-/* ----------
+/*
  * free_var() -
  *
  * Return the digit buffer of a variable to the free pool
- * ----------
  */
 static void
 free_var(NumericVar *var)
@@ -2376,12 +2406,11 @@ free_var(NumericVar *var)
 }
 
 
-/* ----------
+/*
  * zero_var() -
  *
  * Set a variable to ZERO.
- * Note: rscale and dscale are not touched.
- * ----------
+ * Note: its dscale is not touched.
  */
 static void
 zero_var(NumericVar *var)
@@ -2395,19 +2424,33 @@ zero_var(NumericVar *var)
 }
 
 
-/* ----------
+/*
  * set_var_from_str()
  *
  * Parse a string and put the number into a variable
- * ----------
  */
 static void
-set_var_from_str(char *str, NumericVar *dest)
+set_var_from_str(const char *str, NumericVar *dest)
 {
-   char       *cp = str;
+   const char *cp = str;
    bool        have_dp = FALSE;
-   int         i = 0;
+   int         i;
+   unsigned char *decdigits;
+   int         sign = NUMERIC_POS;
+   int         dweight = -1;
+   int         ddigits;
+   int         dscale = 0;
+   int         weight;
+   int         ndigits;
+   int         offset;
+   NumericDigit *digits;
+
+   /*
+    * We first parse the string to extract decimal digits and determine the
+    * correct decimal weight.  Then convert to NBASE representation.
+    */
 
+   /* skip leading spaces */
    while (*cp)
    {
        if (!isspace((unsigned char) *cp))
@@ -2415,20 +2458,15 @@ set_var_from_str(char *str, NumericVar *dest)
        cp++;
    }
 
-   alloc_var(dest, strlen(cp));
-   dest->weight = -1;
-   dest->dscale = 0;
-   dest->sign = NUMERIC_POS;
-
    switch (*cp)
    {
        case '+':
-           dest->sign = NUMERIC_POS;
+           sign = NUMERIC_POS;
            cp++;
            break;
 
        case '-':
-           dest->sign = NUMERIC_NEG;
+           sign = NUMERIC_NEG;
            cp++;
            break;
    }
@@ -2442,15 +2480,21 @@ set_var_from_str(char *str, NumericVar *dest)
    if (!isdigit((unsigned char) *cp))
        elog(ERROR, "Bad numeric input format '%s'", str);
 
+   decdigits = (unsigned char *) palloc(strlen(cp) + DEC_DIGITS*2);
+
+   /* leading padding for digit alignment later */
+   memset(decdigits, 0, DEC_DIGITS);
+   i = DEC_DIGITS;
+
    while (*cp)
    {
        if (isdigit((unsigned char) *cp))
        {
-           dest->digits[i++] = *cp++ - '0';
+           decdigits[i++] = *cp++ - '0';
            if (!have_dp)
-               dest->weight++;
+               dweight++;
            else
-               dest->dscale++;
+               dscale++;
        }
        else if (*cp == '.')
        {
@@ -2462,7 +2506,10 @@ set_var_from_str(char *str, NumericVar *dest)
        else
            break;
    }
-   dest->ndigits = i;
+
+   ddigits = i - DEC_DIGITS;
+   /* trailing padding for digit alignment later */
+   memset(decdigits + i, 0, DEC_DIGITS-1);
 
    /* Handle exponent, if any */
    if (*cp == 'e' || *cp == 'E')
@@ -2478,10 +2525,10 @@ set_var_from_str(char *str, NumericVar *dest)
        if (exponent > NUMERIC_MAX_PRECISION ||
            exponent < -NUMERIC_MAX_PRECISION)
            elog(ERROR, "Bad numeric input format '%s'", str);
-       dest->weight += (int) exponent;
-       dest->dscale -= (int) exponent;
-       if (dest->dscale < 0)
-           dest->dscale = 0;
+       dweight += (int) exponent;
+       dscale -= (int) exponent;
+       if (dscale < 0)
+           dscale = 0;
    }
 
    /* Should be nothing left but spaces */
@@ -2492,60 +2539,75 @@ set_var_from_str(char *str, NumericVar *dest)
        cp++;
    }
 
-   /* Strip any leading zeroes */
-   while (dest->ndigits > 0 && *(dest->digits) == 0)
+   /*
+    * Okay, convert pure-decimal representation to base NBASE.  First we
+    * need to determine the converted weight and ndigits.  offset is the
+    * number of decimal zeroes to insert before the first given digit to
+    * have a correctly aligned first NBASE digit.
+    */
+   if (dweight >= 0)
+       weight = (dweight + 1 + DEC_DIGITS-1) / DEC_DIGITS - 1;
+   else
+       weight = - ((-dweight - 1) / DEC_DIGITS + 1);
+   offset = (weight + 1) * DEC_DIGITS - (dweight + 1);
+   ndigits = (ddigits + offset + DEC_DIGITS-1) / DEC_DIGITS;
+
+   alloc_var(dest, ndigits);
+   dest->sign = sign;
+   dest->weight = weight;
+   dest->dscale = dscale;
+
+   i = DEC_DIGITS - offset;
+   digits = dest->digits;
+
+   while (ndigits-- > 0)
    {
-       (dest->digits)++;
-       (dest->weight)--;
-       (dest->ndigits)--;
+#if DEC_DIGITS == 4
+       *digits++ = ((decdigits[i] * 10 + decdigits[i+1]) * 10 +
+                    decdigits[i+2]) * 10 + decdigits[i+3];
+#elif DEC_DIGITS == 2
+       *digits++ = decdigits[i] * 10 + decdigits[i+1];
+#elif DEC_DIGITS == 1
+       *digits++ = decdigits[i];
+#else
+#error unsupported NBASE
+#endif
+       i += DEC_DIGITS;
    }
-   if (dest->ndigits == 0)
-       dest->weight = 0;
 
-   dest->rscale = dest->dscale;
+   pfree(decdigits);
+
+   /* Strip any leading/trailing zeroes, and normalize weight if zero */
+   strip_var(dest);
 }
 
 
 /*
  * set_var_from_num() -
  *
- * Parse back the packed db format into a variable
- *
+ * Convert the packed db format into a variable
  */
 static void
 set_var_from_num(Numeric num, NumericVar *dest)
 {
-   NumericDigit *digit;
-   int         i;
-   int         n;
+   int         ndigits;
 
-   n = num->varlen - NUMERIC_HDRSZ;    /* number of digit-pairs in packed
-                                        * fmt */
+   ndigits = (num->varlen - NUMERIC_HDRSZ) / sizeof(NumericDigit);
 
-   alloc_var(dest, n * 2);
+   alloc_var(dest, ndigits);
 
    dest->weight = num->n_weight;
-   dest->rscale = num->n_rscale;
-   dest->dscale = NUMERIC_DSCALE(num);
    dest->sign = NUMERIC_SIGN(num);
+   dest->dscale = NUMERIC_DSCALE(num);
 
-   digit = dest->digits;
-
-   for (i = 0; i < n; i++)
-   {
-       unsigned char digitpair = num->n_data[i];
-
-       *digit++ = (digitpair >> 4) & 0x0f;
-       *digit++ = digitpair & 0x0f;
-   }
+   memcpy(dest->digits, num->n_data, ndigits * sizeof(NumericDigit));
 }
 
 
-/* ----------
+/*
  * set_var_from_var() -
  *
  * Copy one variable into another
- * ----------
  */
 static void
 set_var_from_var(NumericVar *value, NumericVar *dest)
@@ -2554,7 +2616,7 @@ set_var_from_var(NumericVar *value, NumericVar *dest)
 
    newbuf = digitbuf_alloc(value->ndigits + 1);
    newbuf[0] = 0;              /* spare digit for rounding */
-   memcpy(newbuf + 1, value->digits, value->ndigits);
+   memcpy(newbuf + 1, value->digits, value->ndigits * sizeof(NumericDigit));
 
    digitbuf_free(dest->buf);
 
@@ -2564,56 +2626,47 @@ set_var_from_var(NumericVar *value, NumericVar *dest)
 }
 
 
-/* ----------
+/*
  * get_str_from_var() -
  *
  * Convert a var to text representation (guts of numeric_out).
  * CAUTION: var's contents may be modified by rounding!
- * Caller must have checked for NaN case.
  * Returns a palloc'd string.
- * ----------
  */
 static char *
 get_str_from_var(NumericVar *var, int dscale)
 {
    char       *str;
    char       *cp;
+   char       *endcp;
    int         i;
    int         d;
+   NumericDigit    dig;
+#if DEC_DIGITS > 1
+   NumericDigit    d1;
+#endif
+
+   if (dscale < 0)
+       dscale = 0;
 
    /*
     * Check if we must round up before printing the value and do so.
     */
-   i = dscale + var->weight + 1;
-   if (i >= 0 && var->ndigits > i)
-   {
-       int         carry = (var->digits[i] > 4) ? 1 : 0;
-
-       var->ndigits = i;
-
-       while (carry)
-       {
-           carry += var->digits[--i];
-           var->digits[i] = carry % 10;
-           carry /= 10;
-       }
-
-       if (i < 0)
-       {
-           Assert(i == -1);    /* better not have added more than 1 digit */
-           Assert(var->digits > var->buf);
-           var->digits--;
-           var->ndigits++;
-           var->weight++;
-       }
-   }
-   else
-       var->ndigits = Max(0, Min(i, var->ndigits));
+   round_var(var, dscale);
 
    /*
-    * Allocate space for the result
+    * Allocate space for the result.
+    *
+    * i is set to to # of decimal digits before decimal point.
+    * dscale is the # of decimal digits we will print after decimal point.
+    * We may generate as many as DEC_DIGITS-1 excess digits at the end,
+    * and in addition we need room for sign, decimal point, null terminator.
     */
-   str = palloc(Max(0, dscale) + Max(0, var->weight) + 4);
+   i = (var->weight + 1) * DEC_DIGITS;
+   if (i <= 0)
+       i = 1;
+
+   str = palloc(i + dscale + DEC_DIGITS + 2);
    cp = str;
 
    /*
@@ -2625,33 +2678,87 @@ get_str_from_var(NumericVar *var, int dscale)
    /*
     * Output all digits before the decimal point
     */
-   i = Max(var->weight, 0);
-   d = 0;
-
-   while (i >= 0)
+   if (var->weight < 0)
    {
-       if (i <= var->weight && d < var->ndigits)
-           *cp++ = var->digits[d++] + '0';
-       else
-           *cp++ = '0';
-       i--;
+       d = var->weight + 1;
+       *cp++ = '0';
+   }
+   else
+   {
+       for (d = 0; d <= var->weight; d++)
+       {
+           dig = (d < var->ndigits) ? var->digits[d] : 0;
+           /* In the first digit, suppress extra leading decimal zeroes */
+#if DEC_DIGITS == 4
+           {
+               bool    putit = (d > 0);
+
+               d1 = dig / 1000;
+               dig -= d1 * 1000;
+               putit |= (d1 > 0);
+               if (putit)
+                   *cp++ = d1 + '0';
+               d1 = dig / 100;
+               dig -= d1 * 100;
+               putit |= (d1 > 0);
+               if (putit)
+                   *cp++ = d1 + '0';
+               d1 = dig / 10;
+               dig -= d1 * 10;
+               putit |= (d1 > 0);
+               if (putit)
+                   *cp++ = d1 + '0';
+               *cp++ = dig + '0';
+           }
+#elif DEC_DIGITS == 2
+           d1 = dig / 10;
+           dig -= d1 * 10;
+           if (d1 > 0 || d > 0)
+               *cp++ = d1 + '0';
+           *cp++ = dig + '0';
+#elif DEC_DIGITS == 1
+           *cp++ = dig + '0';
+#else
+#error unsupported NBASE
+#endif
+       }
    }
 
    /*
     * If requested, output a decimal point and all the digits that follow
-    * it.
+    * it.  We initially put out a multiple of DEC_DIGITS digits, then
+    * truncate if needed.
     */
    if (dscale > 0)
    {
        *cp++ = '.';
-       while (i >= -dscale)
+       endcp = cp + dscale;
+       for (i = 0; i < dscale; d++, i += DEC_DIGITS)
        {
-           if (i <= var->weight && d < var->ndigits)
-               *cp++ = var->digits[d++] + '0';
-           else
-               *cp++ = '0';
-           i--;
+           dig = (d >= 0 && d < var->ndigits) ? var->digits[d] : 0;
+#if DEC_DIGITS == 4
+           d1 = dig / 1000;
+           dig -= d1 * 1000;
+           *cp++ = d1 + '0';
+           d1 = dig / 100;
+           dig -= d1 * 100;
+           *cp++ = d1 + '0';
+           d1 = dig / 10;
+           dig -= d1 * 10;
+           *cp++ = d1 + '0';
+           *cp++ = dig + '0';
+#elif DEC_DIGITS == 2
+           d1 = dig / 10;
+           dig -= d1 * 10;
+           *cp++ = d1 + '0';
+           *cp++ = dig + '0';
+#elif DEC_DIGITS == 1
+           *cp++ = dig + '0';
+#else
+#error unsupported NBASE
+#endif
        }
+       cp = endcp;
    }
 
    /*
@@ -2662,23 +2769,21 @@ get_str_from_var(NumericVar *var, int dscale)
 }
 
 
-/* ----------
+/*
  * make_result() -
  *
  * Create the packed db numeric format in palloc()'d memory from
- * a variable.  The var's rscale determines the number of digits kept.
- * ----------
+ * a variable.
  */
 static Numeric
 make_result(NumericVar *var)
 {
    Numeric     result;
-   NumericDigit *digit = var->digits;
+   NumericDigit *digits = var->digits;
    int         weight = var->weight;
    int         sign = var->sign;
    int         n;
-   int         i,
-               j;
+   Size        len;
 
    if (sign == NUMERIC_NAN)
    {
@@ -2686,24 +2791,23 @@ make_result(NumericVar *var)
 
        result->varlen = NUMERIC_HDRSZ;
        result->n_weight = 0;
-       result->n_rscale = 0;
        result->n_sign_dscale = NUMERIC_NAN;
 
        dump_numeric("make_result()", result);
        return result;
    }
 
-   n = Max(0, Min(var->ndigits, var->weight + var->rscale + 1));
+   n = var->ndigits;
 
    /* truncate leading zeroes */
-   while (n > 0 && *digit == 0)
+   while (n > 0 && *digits == 0)
    {
-       digit++;
+       digits++;
        weight--;
        n--;
    }
    /* truncate trailing zeroes */
-   while (n > 0 && digit[n - 1] == 0)
+   while (n > 0 && digits[n - 1] == 0)
        n--;
 
    /* If zero result, force to weight=0 and positive sign */
@@ -2713,42 +2817,38 @@ make_result(NumericVar *var)
        sign = NUMERIC_POS;
    }
 
-   result = (Numeric) palloc(NUMERIC_HDRSZ + (n + 1) / 2);
-   result->varlen = NUMERIC_HDRSZ + (n + 1) / 2;
+   /* Build the result */
+   len = NUMERIC_HDRSZ + n * sizeof(NumericDigit);
+   result = (Numeric) palloc(len);
+   result->varlen = len;
    result->n_weight = weight;
-   result->n_rscale = var->rscale;
-   result->n_sign_dscale = sign |
-       ((uint16) var->dscale & NUMERIC_DSCALE_MASK);
+   result->n_sign_dscale = sign | (var->dscale & NUMERIC_DSCALE_MASK);
 
-   i = 0;
-   j = 0;
-   while (j < n)
-   {
-       unsigned char digitpair = digit[j++] << 4;
+   memcpy(result->n_data, digits, n * sizeof(NumericDigit));
 
-       if (j < n)
-           digitpair |= digit[j++];
-       result->n_data[i++] = digitpair;
-   }
+   /* Check for overflow of int16 fields */
+   if (result->n_weight != weight ||
+       NUMERIC_DSCALE(result) != var->dscale)
+       elog(ERROR, "Value overflows numeric format");
 
    dump_numeric("make_result()", result);
    return result;
 }
 
 
-/* ----------
+/*
  * apply_typmod() -
  *
  * Do bounds checking and rounding according to the attributes
  * typmod field.
- * ----------
  */
 static void
 apply_typmod(NumericVar *var, int32 typmod)
 {
    int         precision;
    int         scale;
-   int         maxweight;
+   int         maxdigits;
+   int         ddigits;
    int         i;
 
    /* Do nothing if we have a default typmod (-1) */
@@ -2758,34 +2858,10 @@ apply_typmod(NumericVar *var, int32 typmod)
    typmod -= VARHDRSZ;
    precision = (typmod >> 16) & 0xffff;
    scale = typmod & 0xffff;
-   maxweight = precision - scale;
-
-   /* Round to target scale */
-   i = scale + var->weight + 1;
-   if (i >= 0 && var->ndigits > i)
-   {
-       int         carry = (var->digits[i] > 4) ? 1 : 0;
-
-       var->ndigits = i;
+   maxdigits = precision - scale;
 
-       while (carry)
-       {
-           carry += var->digits[--i];
-           var->digits[i] = carry % 10;
-           carry /= 10;
-       }
-
-       if (i < 0)
-       {
-           Assert(i == -1);    /* better not have added more than 1 digit */
-           Assert(var->digits > var->buf);
-           var->digits--;
-           var->ndigits++;
-           var->weight++;
-       }
-   }
-   else
-       var->ndigits = Max(0, Min(i, var->ndigits));
+   /* Round to target scale (and set var->dscale) */
+   round_var(var, scale);
 
    /*
     * Check for overflow - note we can't do this before rounding, because
@@ -2794,30 +2870,146 @@ apply_typmod(NumericVar *var, int32 typmod)
     * storage but perhaps might not have been yet. In any case, we must
     * recognize a true zero, whose weight doesn't mean anything.
     */
-   if (var->weight >= maxweight)
+   ddigits = (var->weight + 1) * DEC_DIGITS;
+   if (ddigits > maxdigits)
    {
        /* Determine true weight; and check for all-zero result */
-       int         tweight = var->weight;
-
        for (i = 0; i < var->ndigits; i++)
        {
-           if (var->digits[i])
+           NumericDigit dig = var->digits[i];
+
+           if (dig)
+           {
+               /* Adjust for any high-order decimal zero digits */
+#if DEC_DIGITS == 4
+               if (dig < 10)
+                   ddigits -= 3;
+               else if (dig < 100)
+                   ddigits -= 2;
+               else if (dig < 1000)
+                   ddigits -= 1;
+#elif DEC_DIGITS == 2
+               if (dig < 10)
+                   ddigits -= 1;
+#elif DEC_DIGITS == 1
+               /* no adjustment */
+#else
+#error unsupported NBASE
+#endif
+               if (ddigits > maxdigits)
+                   elog(ERROR, "overflow on numeric "
+                        "ABS(value) >= 10^%d for field with precision %d scale %d",
+                        ddigits-1, precision, scale);
                break;
-           tweight--;
+           }
+           ddigits -= DEC_DIGITS;
        }
+   }
+}
+
+/*
+ * Convert numeric to int8, rounding if needed.
+ *
+ * If overflow, return FALSE (no error is raised).  Return TRUE if okay.
+ *
+ * CAUTION: var's contents may be modified by rounding!
+ */
+static bool
+numericvar_to_int8(NumericVar *var, int64 *result)
+{
+   NumericDigit *digits;
+   int         ndigits;
+   int         i;
+   int64       val,
+               oldval;
+   bool        neg;
+
+   /* Round to nearest integer */
+   round_var(var, 0);
 
-       if (tweight >= maxweight && i < var->ndigits)
-           elog(ERROR, "overflow on numeric "
-             "ABS(value) >= 10^%d for field with precision %d scale %d",
-                tweight, precision, scale);
+   /* Check for zero input */
+   strip_var(var);
+   ndigits = var->ndigits;
+   if (ndigits == 0)
+   {
+       *result = 0;
+       return true;
    }
 
-   var->rscale = scale;
-   var->dscale = scale;
+   /* Construct the result */
+   digits = var->digits;
+   neg = (var->sign == NUMERIC_NEG);
+   val = digits[0];
+   for (i = 1; i < ndigits; i++)
+   {
+       oldval = val;
+       val *= NBASE;
+       val += digits[i];
+       /*
+        * The overflow check is a bit tricky because we want to accept
+        * INT64_MIN, which will overflow the positive accumulator.  We
+        * can detect this case easily though because INT64_MIN is the
+        * only nonzero value for which -val == val (on a two's complement
+        * machine, anyway).
+        */
+       if ((val / NBASE) != oldval)    /* possible overflow? */
+       {
+           if (!neg || (-val) != val || val == 0 || oldval < 0)
+               return false;
+       }
+   }
+
+   *result = neg ? -val : val;
+   return true;
 }
 
-/* Convert numeric to float8; if out of range, return +/- HUGE_VAL */
-/* Caller should have eliminated the possibility of NAN */
+/*
+ * Convert int8 value to numeric.
+ */
+static void
+int8_to_numericvar(int64 val, NumericVar *var)
+{
+   uint64      uval,
+               newuval;
+   NumericDigit *ptr;
+   int         ndigits;
+
+   /* int8 can require at most 19 decimal digits; add one for safety */
+   alloc_var(var, 20/DEC_DIGITS);
+   if (val < 0)
+   {
+       var->sign = NUMERIC_NEG;
+       uval = -val;
+   }
+   else
+   {
+       var->sign = NUMERIC_POS;
+       uval = val;
+   }
+   var->dscale = 0;
+   if (val == 0)
+   {
+       var->ndigits = 0;
+       var->weight = 0;
+       return;
+   }
+   ptr = var->digits + var->ndigits;
+   ndigits = 0;
+   do {
+       ptr--;
+       ndigits++;
+       newuval = uval / NBASE;
+       *ptr = uval - newuval * NBASE;
+       uval = newuval;
+   } while (uval);
+   var->digits = ptr;
+   var->ndigits = ndigits;
+   var->weight = ndigits - 1;
+}
+
+/*
+ * Convert numeric to float8; if out of range, return +/- HUGE_VAL
+ */
 static double
 numeric_to_double_no_overflow(Numeric num)
 {
@@ -2865,11 +3057,11 @@ numericvar_to_double_no_overflow(NumericVar *var)
 }
 
 
-/* ----------
+/*
  * cmp_var() -
  *
- * Compare two values on variable level
- * ----------
+ * Compare two values on variable level.  We assume zeroes have been
+ * truncated to no digits.
  */
 static int
 cmp_var(NumericVar *var1, NumericVar *var2)
@@ -2903,12 +3095,11 @@ cmp_var(NumericVar *var1, NumericVar *var2)
 }
 
 
-/* ----------
+/*
  * add_var() -
  *
  * Full version of add functionality on variable level (handling signs).
  * result might point to one of the operands too without danger.
- * ----------
  */
 static void
 add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
@@ -2941,7 +3132,6 @@ add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
                     * ----------
                     */
                    zero_var(result);
-                   result->rscale = Max(var1->rscale, var2->rscale);
                    result->dscale = Max(var1->dscale, var2->dscale);
                    break;
 
@@ -2985,7 +3175,6 @@ add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
                     * ----------
                     */
                    zero_var(result);
-                   result->rscale = Max(var1->rscale, var2->rscale);
                    result->dscale = Max(var1->dscale, var2->dscale);
                    break;
 
@@ -3024,12 +3213,11 @@ add_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
 }
 
 
-/* ----------
+/*
  * sub_var() -
  *
  * Full version of sub functionality on variable level (handling signs).
  * result might point to one of the operands too without danger.
- * ----------
  */
 static void
 sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
@@ -3065,7 +3253,6 @@ sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
                     * ----------
                     */
                    zero_var(result);
-                   result->rscale = Max(var1->rscale, var2->rscale);
                    result->dscale = Max(var1->dscale, var2->dscale);
                    break;
 
@@ -3109,7 +3296,6 @@ sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
                     * ----------
                     */
                    zero_var(result);
-                   result->rscale = Max(var1->rscale, var2->rscale);
                    result->dscale = Max(var1->dscale, var2->dscale);
                    break;
 
@@ -3148,304 +3334,430 @@ sub_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
 }
 
 
-/* ----------
+/*
  * mul_var() -
  *
  * Multiplication on variable level. Product of var1 * var2 is stored
- * in result.  Accuracy of result is determined by global_rscale.
- * ----------
+ * in result.  Result is rounded to no more than rscale fractional digits.
  */
 static void
-mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
+mul_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
+       int rscale)
 {
-   NumericDigit *res_buf;
-   NumericDigit *res_digits;
    int         res_ndigits;
-   int         res_weight;
    int         res_sign;
+   int         res_weight;
+   int         maxdigits;
+   int        *dig;
+   int         carry;
+   int         maxdig;
+   int         newdig;
+   NumericDigit *res_digits;
    int         i,
                ri,
                i1,
                i2;
-   long        sum = 0;
+   /* copy these values into local vars for speed in inner loop */
+   int         var1ndigits = var1->ndigits;
+   int         var2ndigits = var2->ndigits;
+   NumericDigit *var1digits = var1->digits;
+   NumericDigit *var2digits = var2->digits;
 
-   res_weight = var1->weight + var2->weight + 2;
-   res_ndigits = var1->ndigits + var2->ndigits + 1;
+   if (var1ndigits == 0 || var2ndigits == 0)
+   {
+       /* one or both inputs is zero; so is result */
+       zero_var(result);
+       result->dscale = rscale;
+       return;
+   }
+
+   /* Determine result sign and (maximum possible) weight */
    if (var1->sign == var2->sign)
        res_sign = NUMERIC_POS;
    else
        res_sign = NUMERIC_NEG;
+   res_weight = var1->weight + var2->weight + 2;
 
-   res_buf = digitbuf_alloc(res_ndigits);
-   res_digits = res_buf;
-   memset(res_digits, 0, res_ndigits);
-
-   ri = res_ndigits;
-   for (i1 = var1->ndigits - 1; i1 >= 0; i1--)
+   /*
+    * Determine number of result digits to compute.  If the exact result
+    * would have more than rscale fractional digits, truncate the computation
+    * with MUL_GUARD_DIGITS guard digits.  We do that by pretending that
+    * one or both inputs have fewer digits than they really do.
+    */
+   res_ndigits = var1ndigits + var2ndigits + 1;
+   maxdigits = res_weight + 1 + (rscale * DEC_DIGITS) + MUL_GUARD_DIGITS;
+   if (res_ndigits > maxdigits)
    {
-       sum = 0;
-       i = --ri;
-
-       for (i2 = var2->ndigits - 1; i2 >= 0; i2--)
+       if (maxdigits < 3)
+       {
+           /* no useful precision at all in the result... */
+           zero_var(result);
+           result->dscale = rscale;
+           return;
+       }
+       /* force maxdigits odd so that input ndigits can be equal */
+       if ((maxdigits & 1) == 0)
+           maxdigits++;
+       if (var1ndigits > var2ndigits)
+       {
+           var1ndigits -= res_ndigits - maxdigits;
+           if (var1ndigits < var2ndigits)
+               var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
+       }
+       else
        {
-           sum += res_digits[i] + var1->digits[i1] * var2->digits[i2];
-           res_digits[i--] = sum % 10;
-           sum /= 10;
+           var2ndigits -= res_ndigits - maxdigits;
+           if (var2ndigits < var1ndigits)
+               var1ndigits = var2ndigits = (var1ndigits + var2ndigits) / 2;
        }
-       res_digits[i] = sum;
+       res_ndigits = maxdigits;
+       Assert(res_ndigits == var1ndigits + var2ndigits + 1);
    }
 
-   i = res_weight + global_rscale + 2;
-   if (i >= 0 && i < res_ndigits)
+   /*
+    * We do the arithmetic in an array "dig[]" of signed int's.  Since
+    * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
+    * to avoid normalizing carries immediately.
+    *
+    * maxdig tracks the maximum possible value of any dig[] entry;
+    * when this threatens to exceed INT_MAX, we take the time to propagate
+    * carries.  To avoid overflow in maxdig itself, it actually represents
+    * the max possible value divided by NBASE-1.
+    */
+   dig = (int *) palloc0(res_ndigits * sizeof(int));
+   maxdig = 0;
+
+   ri = res_ndigits - 1;
+   for (i1 = var1ndigits - 1; i1 >= 0; ri--, i1--)
    {
-       sum = (res_digits[i] > 4) ? 1 : 0;
-       res_ndigits = i;
-       i--;
-       while (sum)
+       int     var1digit = var1digits[i1];
+
+       if (var1digit == 0)
+           continue;
+
+       /* Time to normalize? */
+       maxdig += var1digit;
+       if (maxdig > INT_MAX/(NBASE-1))
+       {
+           /* Yes, do it */
+           carry = 0;
+           for (i = res_ndigits-1; i >= 0; i--)
+           {
+               newdig = dig[i] + carry;
+               if (newdig >= NBASE)
+               {
+                   carry = newdig/NBASE;
+                   newdig -= carry*NBASE;
+               }
+               else
+                   carry = 0;
+               dig[i] = newdig;
+           }
+           Assert(carry == 0);
+           /* Reset maxdig to indicate new worst-case */
+           maxdig = 1 + var1digit;
+       }
+
+       /* Add appropriate multiple of var2 into the accumulator */
+       i = ri;
+       for (i2 = var2ndigits - 1; i2 >= 0; i2--)
        {
-           sum += res_digits[i];
-           res_digits[i--] = sum % 10;
-           sum /= 10;
+           dig[i--] += var1digit * var2digits[i2];
        }
    }
 
-   while (res_ndigits > 0 && *res_digits == 0)
+   /*
+    * Now we do a final carry propagation pass to normalize the result,
+    * which we combine with storing the result digits into the output.
+    * Note that this is still done at full precision w/guard digits.
+    */
+   alloc_var(result, res_ndigits);
+   res_digits = result->digits;
+   carry = 0;
+   for (i = res_ndigits-1; i >= 0; i--)
    {
-       res_digits++;
-       res_weight--;
-       res_ndigits--;
+       newdig = dig[i] + carry;
+       if (newdig >= NBASE)
+       {
+           carry = newdig/NBASE;
+           newdig -= carry*NBASE;
+       }
+       else
+           carry = 0;
+       res_digits[i] = newdig;
    }
-   while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0)
-       res_ndigits--;
+   Assert(carry == 0);
 
-   if (res_ndigits == 0)
-   {
-       res_sign = NUMERIC_POS;
-       res_weight = 0;
-   }
+   pfree(dig);
 
-   digitbuf_free(result->buf);
-   result->buf = res_buf;
-   result->digits = res_digits;
-   result->ndigits = res_ndigits;
+   /*
+    * Finally, round the result to the requested precision.
+    */
    result->weight = res_weight;
-   result->rscale = global_rscale;
    result->sign = res_sign;
+
+   /* Round to target rscale (and set result->dscale) */
+   round_var(result, rscale);
+
+   /* Strip leading and trailing zeroes */
+   strip_var(result);
 }
 
 
-/* ----------
+/*
  * div_var() -
  *
- * Division on variable level.  Accuracy of result is determined by
- * global_rscale.
- * ----------
+ * Division on variable level. Quotient of var1 / var2 is stored
+ * in result.  Result is rounded to no more than rscale fractional digits.
  */
 static void
-div_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
+div_var(NumericVar *var1, NumericVar *var2, NumericVar *result,
+       int rscale)
 {
-   NumericDigit *res_digits;
-   int         res_ndigits;
+   int         div_ndigits;
    int         res_sign;
    int         res_weight;
-   NumericVar  dividend;
-   NumericVar  divisor[10];
-   int         ndigits_tmp;
-   int         weight_tmp;
-   int         rscale_tmp;
-   int         ri;
+   int        *div;
+   int         qdigit;
+   int         carry;
+   int         maxdiv;
+   int         newdig;
+   NumericDigit *res_digits;
+   double      fdividend,
+               fdivisor,
+               fdivisorinverse,
+               fquotient;
+   int         qi;
    int         i;
-   long        guess;
-   long        first_have;
-   long        first_div;
-   int         first_nextdigit;
-   int         stat = 0;
+   /* copy these values into local vars for speed in inner loop */
+   int         var1ndigits = var1->ndigits;
+   int         var2ndigits = var2->ndigits;
+   NumericDigit *var1digits = var1->digits;
+   NumericDigit *var2digits = var2->digits;
 
    /*
-    * First of all division by zero check
+    * First of all division by zero check; we must not be handed an
+    * unnormalized divisor.
     */
-   ndigits_tmp = var2->ndigits + 1;
-   if (ndigits_tmp == 1)
+   if (var2ndigits == 0 || var2digits[0] == 0)
        elog(ERROR, "division by zero");
 
-   /*
-    * Determine the result sign, weight and number of digits to calculate
-    */
-   if (var1->sign == var2->sign)
-       res_sign = NUMERIC_POS;
-   else
-       res_sign = NUMERIC_NEG;
-   res_weight = var1->weight - var2->weight + 1;
-   res_ndigits = global_rscale + res_weight;
-   if (res_ndigits <= 0)
-       res_ndigits = 1;
-
    /*
     * Now result zero check
     */
-   if (var1->ndigits == 0)
+   if (var1ndigits == 0)
    {
        zero_var(result);
-       result->rscale = global_rscale;
+       result->dscale = rscale;
        return;
    }
 
    /*
-    * Initialize local variables
+    * Determine the result sign, weight and number of digits to calculate
     */
-   init_var(&dividend);
-   for (i = 1; i < 10; i++)
-       init_var(&divisor[i]);
+   if (var1->sign == var2->sign)
+       res_sign = NUMERIC_POS;
+   else
+       res_sign = NUMERIC_NEG;
+   res_weight = var1->weight - var2->weight + 1;
+   /* The number of accurate result digits we need to produce: */
+   div_ndigits = res_weight + 1 + (rscale + DEC_DIGITS-1)/DEC_DIGITS;
+   /* Add guard digits for roundoff error */
+   div_ndigits += DIV_GUARD_DIGITS;
+   if (div_ndigits < DIV_GUARD_DIGITS)
+       div_ndigits = DIV_GUARD_DIGITS;
+   /* Must be at least var1ndigits, too, to simplify data-loading loop */
+   if (div_ndigits < var1ndigits)
+       div_ndigits = var1ndigits;
 
    /*
-    * Make a copy of the divisor which has one leading zero digit
+    * We do the arithmetic in an array "div[]" of signed int's.  Since
+    * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom
+    * to avoid normalizing carries immediately.
+    *
+    * We start with div[] containing one zero digit followed by the
+    * dividend's digits (plus appended zeroes to reach the desired
+    * precision including guard digits).  Each step of the main loop
+    * computes an (approximate) quotient digit and stores it into div[],
+    * removing one position of dividend space.  A final pass of carry
+    * propagation takes care of any mistaken quotient digits.
     */
-   divisor[1].ndigits = ndigits_tmp;
-   divisor[1].rscale = var2->ndigits;
-   divisor[1].sign = NUMERIC_POS;
-   divisor[1].buf = digitbuf_alloc(ndigits_tmp);
-   divisor[1].digits = divisor[1].buf;
-   divisor[1].digits[0] = 0;
-   memcpy(&(divisor[1].digits[1]), var2->digits, ndigits_tmp - 1);
+   div = (int *) palloc0((div_ndigits + 1) * sizeof(int));
+   for (i = 0; i < var1ndigits; i++)
+       div[i+1] = var1digits[i];
 
    /*
-    * Make a copy of the dividend
+    * We estimate each quotient digit using floating-point arithmetic,
+    * taking the first four digits of the (current) dividend and divisor.
+    * This must be float to avoid overflow.
     */
-   dividend.ndigits = var1->ndigits;
-   dividend.weight = 0;
-   dividend.rscale = var1->ndigits;
-   dividend.sign = NUMERIC_POS;
-   dividend.buf = digitbuf_alloc(var1->ndigits);
-   dividend.digits = dividend.buf;
-   memcpy(dividend.digits, var1->digits, var1->ndigits);
+   fdivisor = (double) var2digits[0];
+   for (i = 1; i < 4; i++)
+   {
+       fdivisor *= NBASE;
+       if (i < var2ndigits)
+           fdivisor += (double) var2digits[i];
+   }
+   fdivisorinverse = 1.0 / fdivisor;
 
    /*
-    * Setup the result
+    * maxdiv tracks the maximum possible absolute value of any div[] entry;
+    * when this threatens to exceed INT_MAX, we take the time to propagate
+    * carries.  To avoid overflow in maxdiv itself, it actually represents
+    * the max possible abs. value divided by NBASE-1.
     */
-   digitbuf_free(result->buf);
-   result->buf = digitbuf_alloc(res_ndigits + 2);
-   res_digits = result->buf;
-   result->digits = res_digits;
-   result->ndigits = res_ndigits;
-   result->weight = res_weight;
-   result->rscale = global_rscale;
-   result->sign = res_sign;
-   res_digits[0] = 0;
-
-   first_div = divisor[1].digits[1] * 10;
-   if (ndigits_tmp > 2)
-       first_div += divisor[1].digits[2];
+   maxdiv = 1;
 
-   first_have = 0;
-   first_nextdigit = 0;
-
-   weight_tmp = 1;
-   rscale_tmp = divisor[1].rscale;
-
-   for (ri = 0; ri <= res_ndigits; ri++)
+   /*
+    * Outer loop computes next quotient digit, which will go into div[qi]
+    */
+   for (qi = 0; qi < div_ndigits; qi++)
    {
-       first_have = first_have * 10;
-       if (first_nextdigit >= 0 && first_nextdigit < dividend.ndigits)
-           first_have += dividend.digits[first_nextdigit];
-       first_nextdigit++;
-
-       guess = (first_have * 10) / first_div + 1;
-       if (guess > 9)
-           guess = 9;
+       /* Approximate the current dividend value */
+       fdividend = (double) div[qi];
+       for (i = 1; i < 4; i++)
+       {
+           fdividend *= NBASE;
+           if (qi+i <= div_ndigits)
+               fdividend += (double) div[qi+i];
+       }
+       /* Compute the (approximate) quotient digit */
+       fquotient = fdividend * fdivisorinverse;
+       qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
+           (((int) fquotient) - 1); /* truncate towards -infinity */
 
-       while (guess > 0)
+       if (qdigit != 0)
        {
-           if (divisor[guess].buf == NULL)
+           /* Do we need to normalize now? */
+           maxdiv += Abs(qdigit);
+           if (maxdiv > INT_MAX/(NBASE-1))
            {
-               int         i;
-               long        sum = 0;
-
-               memcpy(&divisor[guess], &divisor[1], sizeof(NumericVar));
-               divisor[guess].buf = digitbuf_alloc(divisor[guess].ndigits);
-               divisor[guess].digits = divisor[guess].buf;
-               for (i = divisor[1].ndigits - 1; i >= 0; i--)
+               /* Yes, do it */
+               carry = 0;
+               for (i = div_ndigits; i > qi; i--)
                {
-                   sum += divisor[1].digits[i] * guess;
-                   divisor[guess].digits[i] = sum % 10;
-                   sum /= 10;
+                   newdig = div[i] + carry;
+                   if (newdig < 0)
+                   {
+                       carry = -((-newdig-1)/NBASE) - 1;
+                       newdig -= carry*NBASE;
+                   }
+                   else if (newdig >= NBASE)
+                   {
+                       carry = newdig/NBASE;
+                       newdig -= carry*NBASE;
+                   }
+                   else
+                       carry = 0;
+                   div[i] = newdig;
                }
+               newdig = div[qi] + carry;
+               div[qi] = newdig;
+               /*
+                * All the div[] digits except possibly div[qi] are now
+                * in the range 0..NBASE-1.
+                */
+               maxdiv = Abs(newdig) / (NBASE-1);
+               maxdiv = Max(maxdiv, 1);
+               /*
+                * Recompute the quotient digit since new info may have
+                * propagated into the top four dividend digits
+                */
+               fdividend = (double) div[qi];
+               for (i = 1; i < 4; i++)
+               {
+                   fdividend *= NBASE;
+                   if (qi+i <= div_ndigits)
+                       fdividend += (double) div[qi+i];
+               }
+               /* Compute the (approximate) quotient digit */
+               fquotient = fdividend * fdivisorinverse;
+               qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
+                   (((int) fquotient) - 1); /* truncate towards -infinity */
+               maxdiv += Abs(qdigit);
            }
 
-           divisor[guess].weight = weight_tmp;
-           divisor[guess].rscale = rscale_tmp;
-
-           stat = cmp_abs(&dividend, &divisor[guess]);
-           if (stat >= 0)
-               break;
-
-           guess--;
-       }
+           /* Subtract off the appropriate multiple of the divisor */
+           if (qdigit != 0)
+           {
+               int     istop = Min(var2ndigits, div_ndigits-qi+1);
 
-       res_digits[ri + 1] = guess;
-       if (stat == 0)
-       {
-           ri++;
-           break;
+               for (i = 0; i < istop; i++)
+                   div[qi+i] -= qdigit * var2digits[i];
+           }
        }
 
-       weight_tmp--;
-       rscale_tmp++;
-
-       if (guess == 0)
-           continue;
-
-       sub_abs(&dividend, &divisor[guess], &dividend);
+       /*
+        * The dividend digit we are about to replace might still be nonzero.
+        * Fold it into the next digit position.  We don't need to worry about
+        * overflow here since this should nearly cancel with the subtraction
+        * of the divisor.
+        */
+       div[qi+1] += div[qi] * NBASE;
 
-       first_nextdigit = dividend.weight - weight_tmp;
-       first_have = 0;
-       if (first_nextdigit >= 0 && first_nextdigit < dividend.ndigits)
-           first_have = dividend.digits[first_nextdigit];
-       first_nextdigit++;
+       div[qi] = qdigit;
    }
 
-   result->ndigits = ri + 1;
-   if (ri == res_ndigits + 1)
+   /*
+    * Approximate and store the last quotient digit (div[div_ndigits])
+    */
+   fdividend = (double) div[qi];
+   for (i = 1; i < 4; i++)
    {
-       int         carry = (res_digits[ri] > 4) ? 1 : 0;
-
-       result->ndigits = ri;
-       res_digits[ri] = 0;
+       fdividend *= NBASE;
+   }
+   fquotient = fdividend * fdivisorinverse;
+   qdigit = (fquotient >= 0.0) ? ((int) fquotient) :
+       (((int) fquotient) - 1); /* truncate towards -infinity */
+   div[qi] = qdigit;
 
-       while (carry && ri > 0)
+   /*
+    * Now we do a final carry propagation pass to normalize the result,
+    * which we combine with storing the result digits into the output.
+    * Note that this is still done at full precision w/guard digits.
+    */
+   alloc_var(result, div_ndigits+1);
+   res_digits = result->digits;
+   carry = 0;
+   for (i = div_ndigits; i >= 0; i--)
+   {
+       newdig = div[i] + carry;
+       if (newdig < 0)
        {
-           carry += res_digits[--ri];
-           res_digits[ri] = carry % 10;
-           carry /= 10;
+           carry = -((-newdig-1)/NBASE) - 1;
+           newdig -= carry*NBASE;
        }
+       else if (newdig >= NBASE)
+       {
+           carry = newdig/NBASE;
+           newdig -= carry*NBASE;
+       }
+       else
+           carry = 0;
+       res_digits[i] = newdig;
    }
+   Assert(carry == 0);
 
-   while (result->ndigits > 0 && *(result->digits) == 0)
-   {
-       (result->digits)++;
-       (result->weight)--;
-       (result->ndigits)--;
-   }
-   while (result->ndigits > 0 && result->digits[result->ndigits - 1] == 0)
-       (result->ndigits)--;
-   if (result->ndigits == 0)
-       result->sign = NUMERIC_POS;
+   pfree(div);
 
    /*
-    * Tidy up
+    * Finally, round the result to the requested precision.
     */
-   digitbuf_free(dividend.buf);
-   for (i = 1; i < 10; i++)
-       digitbuf_free(divisor[i].buf);
+   result->weight = res_weight;
+   result->sign = res_sign;
+
+   /* Round to target rscale (and set result->dscale) */
+   round_var(result, rscale);
+
+   /* Strip leading and trailing zeroes */
+   strip_var(result);
 }
 
 
 /*
  * Default scale selection for division
  *
- * Returns the appropriate display scale for the division result,
- * and sets global_rscale to the result scale to use during div_var.
- *
- * Note that this must be called before div_var.
+ * Returns the appropriate result scale for the division result.
  */
 static int
 select_div_scale(NumericVar *var1, NumericVar *var2)
@@ -3456,18 +3768,14 @@ select_div_scale(NumericVar *var1, NumericVar *var2)
                i;
    NumericDigit firstdigit1,
                firstdigit2;
-   int         res_dscale;
-   int         res_rscale;
+   int         rscale;
 
    /*
     * The result scale of a division isn't specified in any SQL standard.
-    * For PostgreSQL we select a display scale that will give at least
+    * For PostgreSQL we select a result scale that will give at least
     * NUMERIC_MIN_SIG_DIGITS significant digits, so that numeric gives a
     * result no less accurate than float8; but use a scale not less than
     * either input's display scale.
-    *
-    * The result scale is NUMERIC_EXTRA_DIGITS more than the display scale,
-    * to provide some guard digits in the calculation.
     */
 
    /* Get the actual (normalized) weight and first digit of each input */
@@ -3504,74 +3812,59 @@ select_div_scale(NumericVar *var1, NumericVar *var2)
    if (firstdigit1 <= firstdigit2)
        qweight--;
 
-   /* Select display scale */
-   res_dscale = NUMERIC_MIN_SIG_DIGITS - qweight;
-   res_dscale = Max(res_dscale, var1->dscale);
-   res_dscale = Max(res_dscale, var2->dscale);
-   res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE);
-   res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
-
    /* Select result scale */
-   res_rscale = res_dscale + NUMERIC_EXTRA_DIGITS;
-   global_rscale = res_rscale;
+   rscale = NUMERIC_MIN_SIG_DIGITS - qweight * DEC_DIGITS;
+   rscale = Max(rscale, var1->dscale);
+   rscale = Max(rscale, var2->dscale);
+   rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+   rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
 
-   return res_dscale;
+   return rscale;
 }
 
 
-/* ----------
+/*
  * mod_var() -
  *
  * Calculate the modulo of two numerics at variable level
- * ----------
  */
 static void
 mod_var(NumericVar *var1, NumericVar *var2, NumericVar *result)
 {
    NumericVar  tmp;
-   int         save_global_rscale;
-   int         div_dscale;
+   int         rscale;
 
    init_var(&tmp);
 
    /* ---------
     * We do this using the equation
     *      mod(x,y) = x - trunc(x/y)*y
-    * We set global_rscale the same way numeric_div and numeric_mul do
+    * We set rscale the same way numeric_div and numeric_mul do
     * to get the right answer from the equation.  The final result,
     * however, need not be displayed to more precision than the inputs.
     * ----------
     */
-   save_global_rscale = global_rscale;
-
-   div_dscale = select_div_scale(var1, var2);
-
-   div_var(var1, var2, &tmp);
+   rscale = select_div_scale(var1, var2);
 
-   tmp.dscale = div_dscale;
+   div_var(var1, var2, &tmp, rscale);
 
-   /* do trunc() by forgetting digits to the right of the decimal point */
-   tmp.ndigits = Max(0, Min(tmp.ndigits, tmp.weight + 1));
+   trunc_var(&tmp, 0);
 
-   global_rscale = var2->rscale + tmp.rscale;
-
-   mul_var(var2, &tmp, &tmp);
+   mul_var(var2, &tmp, &tmp, var2->dscale + tmp.dscale);
 
    sub_var(var1, &tmp, result);
 
-   result->dscale = Max(var1->dscale, var2->dscale);
+   round_var(result, Max(var1->dscale, var2->dscale));
 
-   global_rscale = save_global_rscale;
    free_var(&tmp);
 }
 
 
-/* ----------
+/*
  * ceil_var() -
  *
  * Return the smallest integer greater than or equal to the argument
  * on variable level
- * ----------
  */
 static void
 ceil_var(NumericVar *var, NumericVar *result)
@@ -3581,9 +3874,9 @@ ceil_var(NumericVar *var, NumericVar *result)
    init_var(&tmp);
    set_var_from_var(var, &tmp);
 
-   tmp.rscale = 0;
-   tmp.ndigits = Min(tmp.ndigits, Max(0, tmp.weight + 1));
-   if (tmp.sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
+   trunc_var(&tmp, 0);
+
+   if (var->sign == NUMERIC_POS && cmp_var(var, &tmp) != 0)
        add_var(&tmp, &const_one, &tmp);
 
    set_var_from_var(&tmp, result);
@@ -3591,12 +3884,11 @@ ceil_var(NumericVar *var, NumericVar *result)
 }
 
 
-/* ----------
+/*
  * floor_var() -
  *
  * Return the largest integer equal to or less than the argument
  * on variable level
- * ----------
  */
 static void
 floor_var(NumericVar *var, NumericVar *result)
@@ -3606,9 +3898,9 @@ floor_var(NumericVar *var, NumericVar *result)
    init_var(&tmp);
    set_var_from_var(var, &tmp);
 
-   tmp.rscale = 0;
-   tmp.ndigits = Min(tmp.ndigits, Max(0, tmp.weight + 1));
-   if (tmp.sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
+   trunc_var(&tmp, 0);
+
+   if (var->sign == NUMERIC_NEG && cmp_var(var, &tmp) != 0)
        sub_var(&tmp, &const_one, &tmp);
 
    set_var_from_var(&tmp, result);
@@ -3616,33 +3908,27 @@ floor_var(NumericVar *var, NumericVar *result)
 }
 
 
-/* ----------
+/*
  * sqrt_var() -
  *
  * Compute the square root of x using Newton's algorithm
- * ----------
  */
 static void
-sqrt_var(NumericVar *arg, NumericVar *result)
+sqrt_var(NumericVar *arg, NumericVar *result, int rscale)
 {
    NumericVar  tmp_arg;
    NumericVar  tmp_val;
    NumericVar  last_val;
-   int         res_rscale;
-   int         save_global_rscale;
+   int         local_rscale;
    int         stat;
 
-   save_global_rscale = global_rscale;
-   global_rscale += 8;
-   res_rscale = global_rscale;
+   local_rscale = rscale + 8;
 
    stat = cmp_var(arg, &const_zero);
    if (stat == 0)
    {
-       set_var_from_var(&const_zero, result);
-       result->rscale = res_rscale;
-       result->sign = NUMERIC_POS;
-       global_rscale = save_global_rscale;
+       zero_var(result);
+       result->dscale = rscale;
        return;
    }
 
@@ -3659,25 +3945,21 @@ sqrt_var(NumericVar *arg, NumericVar *result)
    /*
     * Initialize the result to the first guess
     */
-   digitbuf_free(result->buf);
-   result->buf = digitbuf_alloc(1);
-   result->digits = result->buf;
+   alloc_var(result, 1);
    result->digits[0] = tmp_arg.digits[0] / 2;
    if (result->digits[0] == 0)
        result->digits[0] = 1;
-   result->ndigits = 1;
    result->weight = tmp_arg.weight / 2;
-   result->rscale = res_rscale;
    result->sign = NUMERIC_POS;
 
    set_var_from_var(result, &last_val);
 
    for (;;)
    {
-       div_var(&tmp_arg, result, &tmp_val);
+       div_var(&tmp_arg, result, &tmp_val, local_rscale);
 
        add_var(result, &tmp_val, result);
-       div_var(result, &const_two, result);
+       mul_var(result, &const_zero_point_five, result, local_rscale);
 
        if (cmp_var(&last_val, result) == 0)
            break;
@@ -3688,37 +3970,34 @@ sqrt_var(NumericVar *arg, NumericVar *result)
    free_var(&tmp_val);
    free_var(&tmp_arg);
 
-   global_rscale = save_global_rscale;
-   div_var(result, &const_one, result);
+   /* Round to requested precision */
+   round_var(result, rscale);
 }
 
 
-/* ----------
+/*
  * exp_var() -
  *
  * Raise e to the power of x
- * ----------
  */
 static void
-exp_var(NumericVar *arg, NumericVar *result)
+exp_var(NumericVar *arg, NumericVar *result, int rscale)
 {
    NumericVar  x;
-   NumericVar  xpow;
-   NumericVar  ifac;
-   NumericVar  elem;
-   NumericVar  ni;
-   int         d;
-   int         i;
    int         xintval;
-   int         ndiv2 = 0;
    bool        xneg = FALSE;
-   int         save_global_rscale;
-
+   int         local_rscale;
+
+   /*----------
+    * We separate the integral and fraction parts of x, then compute
+    *      e^x = e^xint * e^xfrac
+    * where e = exp(1) and e^xfrac = exp(xfrac) are computed by
+    * exp_var_internal; the limited range of inputs allows that routine
+    * to do a good job with a simple Taylor series.  Raising e^xint is
+    * done by repeated multiplications in power_var_int.
+    *----------
+    */
    init_var(&x);
-   init_var(&xpow);
-   init_var(&ifac);
-   init_var(&elem);
-   init_var(&ni);
 
    set_var_from_var(arg, &x);
 
@@ -3728,26 +4007,88 @@ exp_var(NumericVar *arg, NumericVar *result)
        x.sign = NUMERIC_POS;
    }
 
-   /* Select an appropriate scale for internal calculation */
+   /* Extract the integer part, remove it from x */
    xintval = 0;
-   for (i = x.weight, d = 0; i >= 0; i--, d++)
+   while (x.weight >= 0)
    {
-       xintval *= 10;
-       if (d < x.ndigits)
-           xintval += x.digits[d];
-       if (xintval >= NUMERIC_MAX_RESULT_SCALE)
+       xintval *= NBASE;
+       if (x.ndigits > 0)
+       {
+           xintval += x.digits[0];
+           x.digits++;
+           x.ndigits--;
+       }
+       x.weight--;
+       /* Guard against overflow */
+       if (xintval >= NUMERIC_MAX_RESULT_SCALE * 3)
            elog(ERROR, "argument for EXP() too big");
    }
 
-   save_global_rscale = global_rscale;
-   global_rscale += xintval / 2 + 8;
+   /* Select an appropriate scale for internal calculation */
+   local_rscale = rscale + MUL_GUARD_DIGITS * 2;
+
+   /* Compute e^xfrac */
+   exp_var_internal(&x, result, local_rscale);
 
-   /* Reduce input into range 0 <= x <= 0.1 */
-   while (cmp_var(&x, &const_zero_point_one) > 0)
+   /* If there's an integer part, multiply by e^xint */
+   if (xintval > 0)
+   {
+       NumericVar  e;
+
+       init_var(&e);
+       exp_var_internal(&const_one, &e, local_rscale);
+       power_var_int(&e, xintval, &e, local_rscale);
+       mul_var(&e, result, result, local_rscale);
+       free_var(&e);
+   }
+
+   /* Compensate for input sign, and round to requested rscale */
+   if (xneg)
+       div_var(&const_one, result, result, rscale);
+   else
+       round_var(result, rscale);
+
+   free_var(&x);
+}
+
+
+/*
+ * exp_var_internal() -
+ *
+ * Raise e to the power of x, where 0 <= x <= 1
+ *
+ * NB: the result should be good to at least rscale digits, but it has
+ * *not* been rounded off; the caller must do that if wanted.
+ */
+static void
+exp_var_internal(NumericVar *arg, NumericVar *result, int rscale)
+{
+   NumericVar  x;
+   NumericVar  xpow;
+   NumericVar  ifac;
+   NumericVar  elem;
+   NumericVar  ni;
+   int         ndiv2 = 0;
+   int         local_rscale;
+
+   init_var(&x);
+   init_var(&xpow);
+   init_var(&ifac);
+   init_var(&elem);
+   init_var(&ni);
+
+   set_var_from_var(arg, &x);
+
+   Assert(x.sign == NUMERIC_POS);
+
+   local_rscale = rscale + 8;
+
+   /* Reduce input into range 0 <= x <= 0.01 */
+   while (cmp_var(&x, &const_zero_point_01) > 0)
    {
        ndiv2++;
-       global_rscale++;
-       div_var(&x, &const_two, &x);
+       local_rscale++;
+       mul_var(&x, &const_zero_point_five, &x, x.dscale+1);
    }
 
    /*
@@ -3756,7 +4097,7 @@ exp_var(NumericVar *arg, NumericVar *result)
     *      exp(x) = 1 + x + x^2/2! + x^3/3! + ...
     *
     * Given the limited range of x, this should converge reasonably quickly.
-    * We run the series until the terms fall below the global_rscale limit.
+    * We run the series until the terms fall below the local_rscale limit.
     */
    add_var(&const_one, &x, result);
    set_var_from_var(&x, &xpow);
@@ -3766,9 +4107,9 @@ exp_var(NumericVar *arg, NumericVar *result)
    for (;;)
    {
        add_var(&ni, &const_one, &ni);
-       mul_var(&xpow, &x, &xpow);
-       mul_var(&ifac, &ni, &ifac);
-       div_var(&xpow, &ifac, &elem);
+       mul_var(&xpow, &x, &xpow, local_rscale);
+       mul_var(&ifac, &ni, &ifac, 0);
+       div_var(&xpow, &ifac, &elem, local_rscale);
 
        if (elem.ndigits == 0)
            break;
@@ -3778,17 +4119,7 @@ exp_var(NumericVar *arg, NumericVar *result)
 
    /* Compensate for argument range reduction */
    while (ndiv2-- > 0)
-       mul_var(result, result, result);
-
-   /* Compensate for input sign, and round to caller's global_rscale */
-   global_rscale = save_global_rscale;
-
-   if (xneg)
-       div_var(&const_one, result, result);
-   else
-       div_var(result, &const_one, result);
-
-   result->sign = NUMERIC_POS;
+       mul_var(result, result, result, local_rscale);
 
    free_var(&x);
    free_var(&xpow);
@@ -3798,27 +4129,25 @@ exp_var(NumericVar *arg, NumericVar *result)
 }
 
 
-/* ----------
+/*
  * ln_var() -
  *
  * Compute the natural log of x
- * ----------
  */
 static void
-ln_var(NumericVar *arg, NumericVar *result)
+ln_var(NumericVar *arg, NumericVar *result, int rscale)
 {
    NumericVar  x;
    NumericVar  xx;
    NumericVar  ni;
    NumericVar  elem;
    NumericVar  fact;
-   int         save_global_rscale;
+   int         local_rscale;
 
    if (cmp_var(arg, &const_zero) <= 0)
        elog(ERROR, "math error on numeric - cannot compute LN of value <= zero");
 
-   save_global_rscale = global_rscale;
-   global_rscale += 8;
+   local_rscale = rscale + 8;
 
    init_var(&x);
    init_var(&xx);
@@ -3826,21 +4155,21 @@ ln_var(NumericVar *arg, NumericVar *result)
    init_var(&elem);
    init_var(&fact);
 
-   set_var_from_var(&const_two, &fact);
    set_var_from_var(arg, &x);
+   set_var_from_var(&const_two, &fact);
 
    /* Reduce input into range 0.9 < x < 1.1 */
    while (cmp_var(&x, &const_zero_point_nine) <= 0)
    {
-       global_rscale++;
-       sqrt_var(&x, &x);
-       mul_var(&fact, &const_two, &fact);
+       local_rscale++;
+       sqrt_var(&x, &x, local_rscale);
+       mul_var(&fact, &const_two, &fact, 0);
    }
    while (cmp_var(&x, &const_one_point_one) >= 0)
    {
-       global_rscale++;
-       sqrt_var(&x, &x);
-       mul_var(&fact, &const_two, &fact);
+       local_rscale++;
+       sqrt_var(&x, &x, local_rscale);
+       mul_var(&fact, &const_two, &fact, 0);
    }
 
    /*
@@ -3856,31 +4185,29 @@ ln_var(NumericVar *arg, NumericVar *result)
     */
    sub_var(&x, &const_one, result);
    add_var(&x, &const_one, &elem);
-   div_var(result, &elem, result);
+   div_var(result, &elem, result, local_rscale);
    set_var_from_var(result, &xx);
-   mul_var(result, result, &x);
+   mul_var(result, result, &x, local_rscale);
 
    set_var_from_var(&const_one, &ni);
 
    for (;;)
    {
        add_var(&ni, &const_two, &ni);
-       mul_var(&xx, &x, &xx);
-       div_var(&xx, &ni, &elem);
+       mul_var(&xx, &x, &xx, local_rscale);
+       div_var(&xx, &ni, &elem, local_rscale);
 
        if (elem.ndigits == 0)
            break;
 
        add_var(result, &elem, result);
 
-       if (elem.weight < (result->weight - 2 * global_rscale))
+       if (elem.weight < (result->weight - local_rscale * 2/DEC_DIGITS))
            break;
    }
 
-   /* Compensate for argument range reduction, round to caller's rscale */
-   global_rscale = save_global_rscale;
-
-   mul_var(result, &fact, result);
+   /* Compensate for argument range reduction, round to requested rscale */
+   mul_var(result, &fact, result, rscale);
 
    free_var(&x);
    free_var(&xx);
@@ -3890,105 +4217,137 @@ ln_var(NumericVar *arg, NumericVar *result)
 }
 
 
-/* ----------
+/*
  * log_var() -
  *
  * Compute the logarithm of num in a given base.
  *
- * Note: this routine chooses rscale and dscale of the result.
- * ----------
+ * Note: this routine chooses dscale of the result.
  */
 static void
 log_var(NumericVar *base, NumericVar *num, NumericVar *result)
 {
    NumericVar  ln_base;
    NumericVar  ln_num;
-   int         save_global_rscale = global_rscale;
-   int         res_dscale;
+   int         dec_digits;
+   int         rscale;
+   int         local_rscale;
 
    init_var(&ln_base);
    init_var(&ln_num);
 
-   /* Set scale for ln() calculations */
-   if (num->weight > 0)
-       res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(num->weight);
-   else if (num->weight < 0)
-       res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(- num->weight);
+   /* Set scale for ln() calculations --- compare numeric_ln() */
+
+   /* Approx decimal digits before decimal point */
+   dec_digits = (num->weight + 1) * DEC_DIGITS;
+
+   if (dec_digits > 1)
+       rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(dec_digits - 1);
+   else if (dec_digits < 1)
+       rscale = NUMERIC_MIN_SIG_DIGITS - (int) log10(1 - dec_digits);
    else
-       res_dscale = NUMERIC_MIN_SIG_DIGITS;
+       rscale = NUMERIC_MIN_SIG_DIGITS;
 
-   res_dscale = Max(res_dscale, base->dscale);
-   res_dscale = Max(res_dscale, num->dscale);
-   res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE);
-   res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
+   rscale = Max(rscale, base->dscale);
+   rscale = Max(rscale, num->dscale);
+   rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+   rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
 
-   global_rscale = res_dscale + 8;
+   local_rscale = rscale + 8;
 
    /* Form natural logarithms */
-   ln_var(base, &ln_base);
-   ln_var(num, &ln_num);
+   ln_var(base, &ln_base, local_rscale);
+   ln_var(num, &ln_num, local_rscale);
 
-   ln_base.dscale = res_dscale;
-   ln_num.dscale = res_dscale;
+   ln_base.dscale = rscale;
+   ln_num.dscale = rscale;
 
    /* Select scale for division result */
-   res_dscale = select_div_scale(&ln_num, &ln_base);
+   rscale = select_div_scale(&ln_num, &ln_base);
 
-   div_var(&ln_num, &ln_base, result);
-
-   result->dscale = res_dscale;
-
-   global_rscale = save_global_rscale;
+   div_var(&ln_num, &ln_base, result, rscale);
 
    free_var(&ln_num);
    free_var(&ln_base);
 }
 
 
-/* ----------
+/*
  * power_var() -
  *
  * Raise base to the power of exp
  *
- * Note: this routine chooses rscale and dscale of the result.
- * ----------
+ * Note: this routine chooses dscale of the result.
  */
 static void
 power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
 {
    NumericVar  ln_base;
    NumericVar  ln_num;
-   int         save_global_rscale = global_rscale;
-   int         res_dscale;
+   int         dec_digits;
+   int         rscale;
+   int         local_rscale;
    double      val;
 
+   /* If exp can be represented as an integer, use power_var_int */
+   if (exp->ndigits == 0 || exp->ndigits <= exp->weight + 1)
+   {
+       /* exact integer, but does it fit in int? */
+       NumericVar  x;
+       int64       expval64;
+
+       /* must copy because numericvar_to_int8() scribbles on input */
+       init_var(&x);
+       set_var_from_var(exp, &x);
+       if (numericvar_to_int8(&x, &expval64))
+       {
+           int     expval = (int) expval64;
+
+           /* Test for overflow by reverse-conversion. */
+           if ((int64) expval == expval64)
+           {
+               /* Okay, select rscale */
+               rscale = NUMERIC_MIN_SIG_DIGITS;
+               rscale = Max(rscale, base->dscale);
+               rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+               rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
+
+               power_var_int(base, expval, result, rscale);
+
+               free_var(&x);
+               return;
+           }
+       }
+       free_var(&x);
+   }
+
    init_var(&ln_base);
    init_var(&ln_num);
 
    /* Set scale for ln() calculation --- need extra accuracy here */
-   if (base->weight > 0)
-       res_dscale = NUMERIC_MIN_SIG_DIGITS*2 - (int) log10(base->weight);
-   else if (base->weight < 0)
-       res_dscale = NUMERIC_MIN_SIG_DIGITS*2 - (int) log10(- base->weight);
-   else
-       res_dscale = NUMERIC_MIN_SIG_DIGITS*2;
 
-   res_dscale = Max(res_dscale, base->dscale * 2);
-   res_dscale = Max(res_dscale, exp->dscale * 2);
-   res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE);
-   res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
+   /* Approx decimal digits before decimal point */
+   dec_digits = (base->weight + 1) * DEC_DIGITS;
 
-   global_rscale = res_dscale + 8;
+   if (dec_digits > 1)
+       rscale = NUMERIC_MIN_SIG_DIGITS*2 - (int) log10(dec_digits - 1);
+   else if (dec_digits < 1)
+       rscale = NUMERIC_MIN_SIG_DIGITS*2 - (int) log10(1 - dec_digits);
+   else
+       rscale = NUMERIC_MIN_SIG_DIGITS*2;
 
-   ln_var(base, &ln_base);
+   rscale = Max(rscale, base->dscale * 2);
+   rscale = Max(rscale, exp->dscale * 2);
+   rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE * 2);
+   rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE * 2);
 
-   ln_base.dscale = res_dscale;
+   local_rscale = rscale + 8;
 
-   mul_var(&ln_base, exp, &ln_num);
+   ln_var(base, &ln_base, local_rscale);
 
-   ln_num.dscale = res_dscale;
+   mul_var(&ln_base, exp, &ln_num, local_rscale);
 
-   /* Set scale for exp() */
+   /* Set scale for exp() -- compare numeric_exp() */
 
    /* convert input to float8, ignoring overflow */
    val = numericvar_to_double_no_overflow(&ln_num);
@@ -4000,22 +4359,86 @@ power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
    val = Max(val, -NUMERIC_MAX_RESULT_SCALE);
    val = Min(val, NUMERIC_MAX_RESULT_SCALE);
 
-   res_dscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
-   res_dscale = Max(res_dscale, base->dscale);
-   res_dscale = Max(res_dscale, exp->dscale);
-   res_dscale = Max(res_dscale, NUMERIC_MIN_DISPLAY_SCALE);
-   res_dscale = Min(res_dscale, NUMERIC_MAX_DISPLAY_SCALE);
+   rscale = NUMERIC_MIN_SIG_DIGITS - (int) val;
+   rscale = Max(rscale, base->dscale);
+   rscale = Max(rscale, exp->dscale);
+   rscale = Max(rscale, NUMERIC_MIN_DISPLAY_SCALE);
+   rscale = Min(rscale, NUMERIC_MAX_DISPLAY_SCALE);
 
-   global_rscale = res_dscale + 8;
+   exp_var(&ln_num, result, rscale);
 
-   exp_var(&ln_num, result);
+   free_var(&ln_num);
+   free_var(&ln_base);
+}
 
-   result->dscale = res_dscale;
+/*
+ * power_var_int() -
+ *
+ * Raise base to the power of exp, where exp is an integer.
+ */
+static void
+power_var_int(NumericVar *base, int exp, NumericVar *result, int rscale)
+{
+   bool        neg;
+   NumericVar  base_prod;
+   int         local_rscale;
 
-   global_rscale = save_global_rscale;
+   /* Detect some special cases, particularly 0^0. */
 
-   free_var(&ln_num);
-   free_var(&ln_base);
+   switch (exp)
+   {
+       case 0:
+           if (base->ndigits == 0)
+               elog(ERROR, "zero raised to zero is undefined");
+           set_var_from_var(&const_one, result);
+           result->dscale = rscale; /* no need to round */
+           return;
+       case 1:
+           set_var_from_var(base, result);
+           round_var(result, rscale);
+           return;
+       case -1:
+           div_var(&const_one, base, result, rscale);
+           return;
+       case 2:
+           mul_var(base, base, result, rscale);
+           return;
+       default:
+           break;
+   }
+
+   /*
+    * The general case repeatedly multiplies base according to the
+    * bit pattern of exp.  We do the multiplications with some extra
+    * precision.
+    */
+   neg = (exp < 0);
+   exp = Abs(exp);
+
+   local_rscale = rscale + MUL_GUARD_DIGITS * 2;
+
+   init_var(&base_prod);
+   set_var_from_var(base, &base_prod);
+
+   if (exp & 1)
+       set_var_from_var(base, result);
+   else
+       set_var_from_var(&const_one, result);
+
+   while ((exp >>= 1) > 0)
+   {
+       mul_var(&base_prod, &base_prod, &base_prod, local_rscale);
+       if (exp & 1)
+           mul_var(&base_prod, result, result, local_rscale);
+   }
+
+   free_var(&base_prod);
+
+   /* Compensate for input sign, and round to requested rscale */
+   if (neg)
+       div_var(&const_one, result, result, rscale);
+   else
+       round_var(result, rscale);
 }
 
 
@@ -4040,30 +4463,36 @@ power_var(NumericVar *base, NumericVar *exp, NumericVar *result)
 static int
 cmp_abs(NumericVar *var1, NumericVar *var2)
 {
+   NumericDigit *var1digits = var1->digits;
+   NumericDigit *var2digits = var2->digits;
    int         i1 = 0;
    int         i2 = 0;
    int         w1 = var1->weight;
    int         w2 = var2->weight;
-   int         stat;
+
+   /* Check any digits before the first common digit */
 
    while (w1 > w2 && i1 < var1->ndigits)
    {
-       if (var1->digits[i1++] != 0)
+       if (var1digits[i1++] != 0)
            return 1;
        w1--;
    }
    while (w2 > w1 && i2 < var2->ndigits)
    {
-       if (var2->digits[i2++] != 0)
+       if (var2digits[i2++] != 0)
            return -1;
        w2--;
    }
 
+   /* At this point, either w1 == w2 or we've run out of digits */
+
    if (w1 == w2)
    {
        while (i1 < var1->ndigits && i2 < var2->ndigits)
        {
-           stat = var1->digits[i1++] - var2->digits[i2++];
+           int         stat = var1digits[i1++] - var2digits[i2++];
+
            if (stat)
            {
                if (stat > 0)
@@ -4073,14 +4502,18 @@ cmp_abs(NumericVar *var1, NumericVar *var2)
        }
    }
 
+   /*
+    * At this point, we've run out of digits on one side or the other;
+    * so any remaining nonzero digits imply that side is larger
+    */
    while (i1 < var1->ndigits)
    {
-       if (var1->digits[i1++] != 0)
+       if (var1digits[i1++] != 0)
            return 1;
    }
    while (i2 < var2->ndigits)
    {
-       if (var2->digits[i2++] != 0)
+       if (var2digits[i2++] != 0)
            return -1;
    }
 
@@ -4088,12 +4521,11 @@ cmp_abs(NumericVar *var1, NumericVar *var2)
 }
 
 
-/* ----------
+/*
  * add_abs() -
  *
  * Add the absolute values of two variables into result.
  * result might point to one of the operands without danger.
- * ----------
  */
 static void
 add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
@@ -4102,7 +4534,9 @@ add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
    NumericDigit *res_digits;
    int         res_ndigits;
    int         res_weight;
-   int         res_rscale;
+   int         res_rscale,
+               rscale1,
+               rscale2;
    int         res_dscale;
    int         i,
                i1,
@@ -4116,14 +4550,21 @@ add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
    NumericDigit *var2digits = var2->digits;
 
    res_weight = Max(var1->weight, var2->weight) + 1;
-   res_rscale = Max(var1->rscale, var2->rscale);
+
    res_dscale = Max(var1->dscale, var2->dscale);
+
+   /* Note: here we are figuring rscale in base-NBASE digits */
+   rscale1 = var1->ndigits - var1->weight - 1;
+   rscale2 = var2->ndigits - var2->weight - 1;
+   res_rscale = Max(rscale1, rscale2);
+
    res_ndigits = res_rscale + res_weight + 1;
    if (res_ndigits <= 0)
        res_ndigits = 1;
 
-   res_buf = digitbuf_alloc(res_ndigits);
-   res_digits = res_buf;
+   res_buf = digitbuf_alloc(res_ndigits + 1);
+   res_buf[0] = 0;             /* spare digit for later rounding */
+   res_digits = res_buf + 1;
 
    i1 = res_rscale + var1->weight + 1;
    i2 = res_rscale + var2->weight + 1;
@@ -4136,9 +4577,9 @@ add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
        if (i2 >= 0 && i2 < var2ndigits)
            carry += var2digits[i2];
 
-       if (carry >= 10)
+       if (carry >= NBASE)
        {
-           res_digits[i] = carry - 10;
+           res_digits[i] = carry - NBASE;
            carry = 1;
        }
        else
@@ -4150,37 +4591,26 @@ add_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
 
    Assert(carry == 0);         /* else we failed to allow for carry out */
 
-   while (res_ndigits > 0 && *res_digits == 0)
-   {
-       res_digits++;
-       res_weight--;
-       res_ndigits--;
-   }
-   while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0)
-       res_ndigits--;
-
-   if (res_ndigits == 0)
-       res_weight = 0;
-
    digitbuf_free(result->buf);
    result->ndigits = res_ndigits;
    result->buf = res_buf;
    result->digits = res_digits;
    result->weight = res_weight;
-   result->rscale = res_rscale;
    result->dscale = res_dscale;
+
+   /* Remove leading/trailing zeroes */
+   strip_var(result);
 }
 
 
-/* ----------
- * sub_abs() -
+/*
+ * sub_abs()
  *
  * Subtract the absolute value of var2 from the absolute value of var1
  * and store in result. result might point to one of the operands
  * without danger.
  *
  * ABS(var1) MUST BE GREATER OR EQUAL ABS(var2) !!!
- * ----------
  */
 static void
 sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
@@ -4189,7 +4619,9 @@ sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
    NumericDigit *res_digits;
    int         res_ndigits;
    int         res_weight;
-   int         res_rscale;
+   int         res_rscale,
+               rscale1,
+               rscale2;
    int         res_dscale;
    int         i,
                i1,
@@ -4203,14 +4635,21 @@ sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
    NumericDigit *var2digits = var2->digits;
 
    res_weight = var1->weight;
-   res_rscale = Max(var1->rscale, var2->rscale);
+
    res_dscale = Max(var1->dscale, var2->dscale);
+
+   /* Note: here we are figuring rscale in base-NBASE digits */
+   rscale1 = var1->ndigits - var1->weight - 1;
+   rscale2 = var2->ndigits - var2->weight - 1;
+   res_rscale = Max(rscale1, rscale2);
+
    res_ndigits = res_rscale + res_weight + 1;
    if (res_ndigits <= 0)
        res_ndigits = 1;
 
-   res_buf = digitbuf_alloc(res_ndigits);
-   res_digits = res_buf;
+   res_buf = digitbuf_alloc(res_ndigits + 1);
+   res_buf[0] = 0;             /* spare digit for later rounding */
+   res_digits = res_buf + 1;
 
    i1 = res_rscale + var1->weight + 1;
    i2 = res_rscale + var2->weight + 1;
@@ -4225,7 +4664,7 @@ sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
 
        if (borrow < 0)
        {
-           res_digits[i] = borrow + 10;
+           res_digits[i] = borrow + NBASE;
            borrow = -1;
        }
        else
@@ -4237,23 +4676,219 @@ sub_abs(NumericVar *var1, NumericVar *var2, NumericVar *result)
 
    Assert(borrow == 0);        /* else caller gave us var1 < var2 */
 
-   while (res_ndigits > 0 && *res_digits == 0)
-   {
-       res_digits++;
-       res_weight--;
-       res_ndigits--;
-   }
-   while (res_ndigits > 0 && res_digits[res_ndigits - 1] == 0)
-       res_ndigits--;
-
-   if (res_ndigits == 0)
-       res_weight = 0;
-
    digitbuf_free(result->buf);
    result->ndigits = res_ndigits;
    result->buf = res_buf;
    result->digits = res_digits;
    result->weight = res_weight;
-   result->rscale = res_rscale;
    result->dscale = res_dscale;
+
+   /* Remove leading/trailing zeroes */
+   strip_var(result);
+}
+
+/*
+ * round_var
+ *
+ * Round the value of a variable to no more than rscale decimal digits
+ * after the decimal point.  NOTE: we allow rscale < 0 here, implying
+ * rounding before the decimal point.
+ */
+static void
+round_var(NumericVar *var, int rscale)
+{
+   NumericDigit   *digits = var->digits;
+   int         di;
+   int         ndigits;
+   int         carry;
+
+   var->dscale = rscale;
+
+   /* decimal digits wanted */
+   di = (var->weight + 1) * DEC_DIGITS + rscale;
+
+   /*
+    * If di = 0, the value loses all digits, but could round up to 1
+    * if its first extra digit is >= 5.  If di < 0 the result must be 0.
+    */
+   if (di < 0)
+   {
+       var->ndigits = 0;
+       var->weight = 0;
+       var->sign = NUMERIC_POS;
+   }
+   else
+   {
+       /* NBASE digits wanted */
+       ndigits = (di + DEC_DIGITS-1) / DEC_DIGITS;
+
+       /* 0, or number of decimal digits to keep in last NBASE digit */
+       di %= DEC_DIGITS;
+
+       if (ndigits < var->ndigits ||
+           (ndigits == var->ndigits && di > 0))
+       {
+           var->ndigits = ndigits;
+
+#if DEC_DIGITS == 1
+           /* di must be zero */
+           carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
+#else
+           if (di == 0)
+           {
+               carry = (digits[ndigits] >= HALF_NBASE) ? 1 : 0;
+           }
+           else
+           {
+               /* Must round within last NBASE digit */
+               int     extra,
+                       pow10;
+
+#if DEC_DIGITS == 4
+               pow10 = round_powers[di];
+#elif DEC_DIGITS == 2
+               pow10 = 10;
+#else
+#error unsupported NBASE
+#endif
+               extra = digits[--ndigits] % pow10;
+               digits[ndigits] -= extra;
+               carry = 0;
+               if (extra >= pow10/2)
+               {
+                   pow10 += digits[ndigits];
+                   if (pow10 >= NBASE)
+                   {
+                       pow10 -= NBASE;
+                       carry = 1;
+                   }
+                   digits[ndigits] = pow10;
+               }
+           }
+#endif
+
+           /* Propagate carry if needed */
+           while (carry)
+           {
+               carry += digits[--ndigits];
+               if (carry >= NBASE)
+               {
+                   digits[ndigits] = carry - NBASE;
+                   carry = 1;
+               }
+               else
+               {
+                   digits[ndigits] = carry;
+                   carry = 0;
+               }
+           }
+
+           if (ndigits < 0)
+           {
+               Assert(ndigits == -1);  /* better not have added > 1 digit */
+               Assert(var->digits > var->buf);
+               var->digits--;
+               var->ndigits++;
+               var->weight++;
+           }
+       }
+   }
+}
+
+/*
+ * trunc_var
+ *
+ * Truncate the value of a variable at rscale decimal digits after the
+ * decimal point.  NOTE: we allow rscale < 0 here, implying
+ * truncation before the decimal point.
+ */
+static void
+trunc_var(NumericVar *var, int rscale)
+{
+   int         di;
+   int         ndigits;
+
+   var->dscale = rscale;
+
+   /* decimal digits wanted */
+   di = (var->weight + 1) * DEC_DIGITS + rscale;
+
+   /*
+    * If di <= 0, the value loses all digits.
+    */
+   if (di <= 0)
+   {
+       var->ndigits = 0;
+       var->weight = 0;
+       var->sign = NUMERIC_POS;
+   }
+   else
+   {
+       /* NBASE digits wanted */
+       ndigits = (di + DEC_DIGITS-1) / DEC_DIGITS;
+
+       if (ndigits <= var->ndigits)
+       {
+           var->ndigits = ndigits;
+
+#if DEC_DIGITS == 1
+           /* no within-digit stuff to worry about */
+#else
+           /* 0, or number of decimal digits to keep in last NBASE digit */
+           di %= DEC_DIGITS;
+
+           if (di > 0)
+           {
+               /* Must truncate within last NBASE digit */
+               NumericDigit   *digits = var->digits;
+               int     extra,
+                       pow10;
+
+#if DEC_DIGITS == 4
+               pow10 = round_powers[di];
+#elif DEC_DIGITS == 2
+               pow10 = 10;
+#else
+#error unsupported NBASE
+#endif
+               extra = digits[--ndigits] % pow10;
+               digits[ndigits] -= extra;
+           }
+#endif
+       }
+   }
+}
+
+/*
+ * strip_var
+ *
+ * Strip any leading and trailing zeroes from a numeric variable
+ */
+static void
+strip_var(NumericVar *var)
+{
+   NumericDigit   *digits = var->digits;
+   int         ndigits = var->ndigits;
+
+   /* Strip leading zeroes */
+   while (ndigits > 0 && *digits == 0)
+   {
+       digits++;
+       var->weight--;
+       ndigits--;
+   }
+
+   /* Strip trailing zeroes */
+   while (ndigits > 0 && digits[ndigits - 1] == 0)
+       ndigits--;
+
+   /* If it's zero, normalize the sign and weight */
+   if (ndigits == 0)
+   {
+       var->sign = NUMERIC_POS;
+       var->weight = 0;
+   }
+
+   var->digits = digits;
+   var->ndigits = ndigits;
 }
index 8cd3f1472bec06d62b88e4a55299aae86be1a125..1bbd5f6636613a37a799d0ca18e169ab941b94df 100644 (file)
@@ -37,7 +37,7 @@
  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catversion.h,v 1.181 2003/03/20 03:34:56 momjian Exp $
+ * $Id: catversion.h,v 1.182 2003/03/21 01:58:04 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 200303191
+#define CATALOG_VERSION_NO 200303201
 
 #endif
index a8148d59cea1697fe87080acb7c99dd08745fcf8..756d7c11232251b81183b5d496fc9f78b4b51e32 100644 (file)
@@ -1,15 +1,16 @@
-/* ----------
+/*-------------------------------------------------------------------------
+ *
  * numeric.h
+ *   Definitions for the exact numeric data type of Postgres
  *
- * Definitions for the exact numeric data type of Postgres
+ * Original coding 1998, Jan Wieck.  Heavily revised 2003, Tom Lane.
  *
- * 1998 Jan Wieck
+ * Copyright (c) 1998-2003, PostgreSQL Global Development Group
  *
- * $Id: numeric.h,v 1.16 2002/10/02 19:21:26 tgl Exp $
+ * $Id: numeric.h,v 1.17 2003/03/21 01:58:05 tgl Exp $
  *
- * ----------
+ *-------------------------------------------------------------------------
  */
-
 #ifndef _PG_NUMERIC_H_
 #define _PG_NUMERIC_H_
 
  */
 #define NUMERIC_MIN_SIG_DIGITS     16
 
-/*
- * Standard number of extra digits carried internally while doing
- * inexact calculations.
- */
-#define NUMERIC_EXTRA_DIGITS       4
-
 
 /*
  * Sign values and macros to deal with packing/unpacking n_sign_dscale
 #define NUMERIC_DSCALE_MASK 0x3FFF
 #define NUMERIC_SIGN(n)        ((n)->n_sign_dscale & NUMERIC_SIGN_MASK)
 #define NUMERIC_DSCALE(n)  ((n)->n_sign_dscale & NUMERIC_DSCALE_MASK)
-#define NUMERIC_IS_NAN(n)  (NUMERIC_SIGN(n) != NUMERIC_POS &&          \
-                               NUMERIC_SIGN(n) != NUMERIC_NEG)
+#define NUMERIC_IS_NAN(n)  (NUMERIC_SIGN(n) != NUMERIC_POS &&  \
+                            NUMERIC_SIGN(n) != NUMERIC_NEG)
 
 
 /*
  * The Numeric data type stored in the database
  *
  * NOTE: by convention, values in the packed form have been stripped of
- * all leading and trailing zeroes (except there will be a trailing zero
- * in the last byte, if the number of digits is odd).  In particular,
- * if the value is zero, there will be no digits at all!  The weight is
- * arbitrary in that case, but we normally set it to zero.
+ * all leading and trailing zero digits (where a "digit" is of base NBASE).
+ * In particular, if the value is zero, there will be no digits at all!
+ * The weight is arbitrary in that case, but we normally set it to zero.
  */
 typedef struct NumericData
 {
-   int32       varlen;         /* Variable size        */
+   int32       varlen;         /* Variable size (std varlena header) */
    int16       n_weight;       /* Weight of 1st digit  */
-   uint16      n_rscale;       /* Result scale         */
    uint16      n_sign_dscale;  /* Sign + display scale */
-   unsigned char n_data[1];    /* Digit data (2 decimal digits/byte) */
+   char        n_data[1];      /* Digits (really array of NumericDigit) */
 } NumericData;
+
 typedef NumericData *Numeric;
 
-#define NUMERIC_HDRSZ  (sizeof(int32) + sizeof(uint16) * 3)
+#define NUMERIC_HDRSZ  (sizeof(int32) + sizeof(int16) + sizeof(uint16))
 
 
 /*
index 5a6287551909ff1dc774ecac1c3abf3a4538ee49..9378ce7c9bc46224012b278c82fc683ce6135b27 100644 (file)
@@ -2,15 +2,15 @@
 -- AGGREGATES
 --
 SELECT avg(four) AS avg_1 FROM onek;
-        avg_1        
----------------------
- 1.50000000000000000
+       avg_1        
+--------------------
+ 1.5000000000000000
 (1 row)
 
 SELECT avg(a) AS avg_32 FROM aggtest WHERE a < 100;
-       avg_32       
---------------------
- 32.666666666666667
+       avg_32        
+---------------------
+ 32.6666666666666667
 (1 row)
 
 -- In 7.1, avg(float4) is computed using float8 arithmetic.
@@ -120,9 +120,9 @@ group by ten order by ten;
 (10 rows)
 
 SELECT newavg(four) AS avg_1 FROM onek;
-        avg_1        
----------------------
- 1.50000000000000000
+       avg_1        
+--------------------
+ 1.5000000000000000
 (1 row)
 
 SELECT newsum(four) AS sum_1500 FROM onek;
index d1797df490340330d6d572e37cf66ca97f610e61..50730edcf8ff95ac8292c6cc69198b8bef3825b6 100644 (file)
@@ -665,9 +665,9 @@ SELECT t1.id1, t1.result, t2.expected
 -- ******************************
 -- numeric AVG used to fail on some platforms
 SELECT AVG(val) FROM num_data;
-         avg          
-----------------------
- -13430913.5922423207
+          avg           
+------------------------
+ -13430913.592242320700
 (1 row)
 
 -- Check for appropriate rounding and overflow