Speed up array element assignment in plpgsql by caching type information.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 26 Sep 2011 19:38:07 +0000 (15:38 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 26 Sep 2011 19:38:07 +0000 (15:38 -0400)
Cache assorted data in the PLpgSQL_arrayelem struct to avoid repetitive
catalog lookups over multiple executions of the same statement.

Pavel Stehule

src/pl/plpgsql/src/gram.y
src/pl/plpgsql/src/pl_exec.c
src/pl/plpgsql/src/plpgsql.h
src/test/regress/expected/plpgsql.out
src/test/regress/sql/plpgsql.sql

index 92b54dd9cf629dc091059d717c869111f52b0b01..f8e956b2a423de75fe7dcef58d4901e184438303 100644 (file)
@@ -998,6 +998,8 @@ assign_var      : T_DATUM
                        new->dtype      = PLPGSQL_DTYPE_ARRAYELEM;
                        new->subscript  = $3;
                        new->arrayparentno = $1;
+                       /* initialize cached type data to "not valid" */
+                       new->parenttypoid = InvalidOid;
 
                        plpgsql_adddatum((PLpgSQL_datum *) new);
 
index f0ed762ccc343f6b5e0c6d366475bb95cdd527e0..b1ed3c3167c6120bd6d0d30998c6f006077e9791 100644 (file)
@@ -874,7 +874,8 @@ copy_plpgsql_datum(PLpgSQL_datum *datum)
 
            /*
             * These datum records are read-only at runtime, so no need to
-            * copy them
+            * copy them (well, ARRAYELEM contains some cached type data,
+            * but we'd just as soon centralize the caching anyway)
             */
            result = datum;
            break;
@@ -3986,20 +3987,16 @@ exec_assign_value(PLpgSQL_execstate *estate,
                /*
                 * Target is an element of an array
                 */
+               PLpgSQL_arrayelem *arrayelem;
                int         nsubscripts;
                int         i;
                PLpgSQL_expr *subscripts[MAXDIM];
                int         subscriptvals[MAXDIM];
-               bool        oldarrayisnull;
-               Oid         arraytypeid,
-                           arrayelemtypeid;
-               int32       arraytypmod;
-               int16       arraytyplen,
-                           elemtyplen;
-               bool        elemtypbyval;
-               char        elemtypalign;
                Datum       oldarraydatum,
                            coerced_value;
+               bool        oldarrayisnull;
+               Oid         parenttypoid;
+               int32       parenttypmod;
                ArrayType  *oldarrayval;
                ArrayType  *newarrayval;
                SPITupleTable *save_eval_tuptable;
@@ -4020,13 +4017,14 @@ exec_assign_value(PLpgSQL_execstate *estate,
                 * back to find the base array datum, and save the subscript
                 * expressions as we go.  (We are scanning right to left here,
                 * but want to evaluate the subscripts left-to-right to
-                * minimize surprises.)
+                * minimize surprises.)  Note that arrayelem is left pointing
+                * to the leftmost arrayelem datum, where we will cache the
+                * array element type data.
                 */
                nsubscripts = 0;
                do
                {
-                   PLpgSQL_arrayelem *arrayelem = (PLpgSQL_arrayelem *) target;
-
+                   arrayelem = (PLpgSQL_arrayelem *) target;
                    if (nsubscripts >= MAXDIM)
                        ereport(ERROR,
                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
@@ -4038,24 +4036,51 @@ exec_assign_value(PLpgSQL_execstate *estate,
 
                /* Fetch current value of array datum */
                exec_eval_datum(estate, target,
-                               &arraytypeid, &arraytypmod,
+                               &parenttypoid, &parenttypmod,
                                &oldarraydatum, &oldarrayisnull);
 
-               /* If target is domain over array, reduce to base type */
-               arraytypeid = getBaseTypeAndTypmod(arraytypeid, &arraytypmod);
-
-               /* ... and identify the element type */
-               arrayelemtypeid = get_element_type(arraytypeid);
-               if (!OidIsValid(arrayelemtypeid))
-                   ereport(ERROR,
-                           (errcode(ERRCODE_DATATYPE_MISMATCH),
-                            errmsg("subscripted object is not an array")));
-
-               get_typlenbyvalalign(arrayelemtypeid,
-                                    &elemtyplen,
-                                    &elemtypbyval,
-                                    &elemtypalign);
-               arraytyplen = get_typlen(arraytypeid);
+               /* Update cached type data if necessary */
+               if (arrayelem->parenttypoid != parenttypoid ||
+                   arrayelem->parenttypmod != parenttypmod)
+               {
+                   Oid         arraytypoid;
+                   int32       arraytypmod = parenttypmod;
+                   int16       arraytyplen;
+                   Oid         elemtypoid;
+                   int16       elemtyplen;
+                   bool        elemtypbyval;
+                   char        elemtypalign;
+
+                   /* If target is domain over array, reduce to base type */
+                   arraytypoid = getBaseTypeAndTypmod(parenttypoid,
+                                                      &arraytypmod);
+
+                   /* ... and identify the element type */
+                   elemtypoid = get_element_type(arraytypoid);
+                   if (!OidIsValid(elemtypoid))
+                       ereport(ERROR,
+                               (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                errmsg("subscripted object is not an array")));
+
+                   /* Collect needed data about the types */
+                   arraytyplen = get_typlen(arraytypoid);
+
+                   get_typlenbyvalalign(elemtypoid,
+                                        &elemtyplen,
+                                        &elemtypbyval,
+                                        &elemtypalign);
+
+                   /* Now safe to update the cached data */
+                   arrayelem->parenttypoid = parenttypoid;
+                   arrayelem->parenttypmod = parenttypmod;
+                   arrayelem->arraytypoid = arraytypoid;
+                   arrayelem->arraytypmod = arraytypmod;
+                   arrayelem->arraytyplen = arraytyplen;
+                   arrayelem->elemtypoid = elemtypoid;
+                   arrayelem->elemtyplen = elemtyplen;
+                   arrayelem->elemtypbyval = elemtypbyval;
+                   arrayelem->elemtypalign = elemtypalign;
+               }
 
                /*
                 * Evaluate the subscripts, switch into left-to-right order.
@@ -4093,8 +4118,8 @@ exec_assign_value(PLpgSQL_execstate *estate,
                /* Coerce source value to match array element type. */
                coerced_value = exec_simple_cast_value(value,
                                                       valtype,
-                                                      arrayelemtypeid,
-                                                      arraytypmod,
+                                                      arrayelem->elemtypoid,
+                                                      arrayelem->arraytypmod,
                                                       *isNull);
 
                /*
@@ -4107,12 +4132,12 @@ exec_assign_value(PLpgSQL_execstate *estate,
                 * array, either, so that's a no-op too.  This is all ugly but
                 * corresponds to the current behavior of ExecEvalArrayRef().
                 */
-               if (arraytyplen > 0 &&  /* fixed-length array? */
+               if (arrayelem->arraytyplen > 0 &&   /* fixed-length array? */
                    (oldarrayisnull || *isNull))
                    return;
 
                if (oldarrayisnull)
-                   oldarrayval = construct_empty_array(arrayelemtypeid);
+                   oldarrayval = construct_empty_array(arrayelem->elemtypoid);
                else
                    oldarrayval = (ArrayType *) DatumGetPointer(oldarraydatum);
 
@@ -4124,16 +4149,17 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                        subscriptvals,
                                        coerced_value,
                                        *isNull,
-                                       arraytyplen,
-                                       elemtyplen,
-                                       elemtypbyval,
-                                       elemtypalign);
+                                       arrayelem->arraytyplen,
+                                       arrayelem->elemtyplen,
+                                       arrayelem->elemtypbyval,
+                                       arrayelem->elemtypalign);
 
                /*
                 * Avoid leaking the result of exec_simple_cast_value, if it
                 * performed a conversion to a pass-by-ref type.
                 */
-               if (!*isNull && coerced_value != value && !elemtypbyval)
+               if (!*isNull && coerced_value != value &&
+                   !arrayelem->elemtypbyval)
                    pfree(DatumGetPointer(coerced_value));
 
                /*
@@ -4145,7 +4171,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
                *isNull = false;
                exec_assign_value(estate, target,
                                  PointerGetDatum(newarrayval),
-                                 arraytypeid, isNull);
+                                 arrayelem->arraytypoid, isNull);
 
                /*
                 * Avoid leaking the modified array value, too.
index ed8fcadb20730b86f0216630088571f9f25c6950..61503f10a786b18765893185721e12ff2587e8b8 100644 (file)
@@ -299,6 +299,16 @@ typedef struct
    int         dno;
    PLpgSQL_expr *subscript;
    int         arrayparentno;  /* dno of parent array variable */
+   /* Remaining fields are cached info about the array variable's type */
+   Oid         parenttypoid;   /* type of array variable; 0 if not yet set */
+   int32       parenttypmod;   /* typmod of array variable */
+   Oid         arraytypoid;    /* OID of actual array type */
+   int32       arraytypmod;    /* typmod of array (and its elements too) */
+   int16       arraytyplen;    /* typlen of array type */
+   Oid         elemtypoid;     /* OID of array element type */
+   int16       elemtyplen;     /* typlen of element type */
+   bool        elemtypbyval;   /* element type is pass-by-value? */
+   char        elemtypalign;   /* typalign of element type */
 } PLpgSQL_arrayelem;
 
 
index bed34c8a87ca97fa0be61c4bfc4021285f02a4cd..238bf5f0aec5e85e6af8598e4174797116644e25 100644 (file)
@@ -4509,3 +4509,65 @@ NOTICE:  {"(35,78)","(88,76)"}
 
 drop function foreach_test(anyarray);
 drop type xy_tuple;
+--
+-- Assorted tests for array subscript assignment
+--
+create temp table rtype (id int, ar text[]);
+create function arrayassign1() returns text[] language plpgsql as $$
+declare
+ r record;
+begin
+  r := row(12, '{foo,bar,baz}')::rtype;
+  r.ar[2] := 'replace';
+  return r.ar;
+end$$;
+select arrayassign1();
+   arrayassign1    
+-------------------
+ {foo,replace,baz}
+(1 row)
+
+select arrayassign1(); -- try again to exercise internal caching
+   arrayassign1    
+-------------------
+ {foo,replace,baz}
+(1 row)
+
+create domain orderedarray as int[2]
+  constraint sorted check (value[1] < value[2]);
+select '{1,2}'::orderedarray;
+ orderedarray 
+--------------
+ {1,2}
+(1 row)
+
+select '{2,1}'::orderedarray;  -- fail
+ERROR:  value for domain orderedarray violates check constraint "sorted"
+create function testoa(x1 int, x2 int, x3 int) returns orderedarray
+language plpgsql as $$
+declare res orderedarray;
+begin
+  res := array[x1, x2];
+  res[2] := x3;
+  return res;
+end$$;
+select testoa(1,2,3);
+ testoa 
+--------
+ {1,3}
+(1 row)
+
+select testoa(1,2,3); -- try again to exercise internal caching
+ testoa 
+--------
+ {1,3}
+(1 row)
+
+select testoa(2,1,3); -- fail at initial assign
+ERROR:  value for domain orderedarray violates check constraint "sorted"
+CONTEXT:  PL/pgSQL function "testoa" line 4 at assignment
+select testoa(1,2,1); -- fail at update
+ERROR:  value for domain orderedarray violates check constraint "sorted"
+CONTEXT:  PL/pgSQL function "testoa" line 5 at assignment
+drop function arrayassign1();
+drop function testoa(x1 int, x2 int, x3 int);
index 05f031575c02490b5afa048c405b6fc51b382bf8..b47c2de312a102a1342f54c5101be95edfa4d3b2 100644 (file)
@@ -3559,3 +3559,44 @@ select foreach_test(ARRAY[[(10,20),(40,69)],[(35,78),(88,76)]]::xy_tuple[]);
 
 drop function foreach_test(anyarray);
 drop type xy_tuple;
+
+--
+-- Assorted tests for array subscript assignment
+--
+
+create temp table rtype (id int, ar text[]);
+
+create function arrayassign1() returns text[] language plpgsql as $$
+declare
+ r record;
+begin
+  r := row(12, '{foo,bar,baz}')::rtype;
+  r.ar[2] := 'replace';
+  return r.ar;
+end$$;
+
+select arrayassign1();
+select arrayassign1(); -- try again to exercise internal caching
+
+create domain orderedarray as int[2]
+  constraint sorted check (value[1] < value[2]);
+
+select '{1,2}'::orderedarray;
+select '{2,1}'::orderedarray;  -- fail
+
+create function testoa(x1 int, x2 int, x3 int) returns orderedarray
+language plpgsql as $$
+declare res orderedarray;
+begin
+  res := array[x1, x2];
+  res[2] := x3;
+  return res;
+end$$;
+
+select testoa(1,2,3);
+select testoa(1,2,3); -- try again to exercise internal caching
+select testoa(2,1,3); -- fail at initial assign
+select testoa(1,2,1); -- fail at update
+
+drop function arrayassign1();
+drop function testoa(x1 int, x2 int, x3 int);