Speed up array element assignment in plpgsql by caching type information.
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 26 Sep 2011 19:38:07 +0000 (15:38 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 26 Sep 2011 19:38:07 +0000 (15:38 -0400)
Cache assorted data in the PLpgSQL_arrayelem struct to avoid repetitive
catalog lookups over multiple executions of the same statement.

Pavel Stehule

src/pl/plpgsql/src/gram.y
src/pl/plpgsql/src/pl_exec.c
src/pl/plpgsql/src/plpgsql.h
src/test/regress/expected/plpgsql.out
src/test/regress/sql/plpgsql.sql

index 92b54dd9cf629dc091059d717c869111f52b0b01..f8e956b2a423de75fe7dcef58d4901e184438303 100644 (file)
@@ -998,6 +998,8 @@ assign_var          : T_DATUM
                                                new->dtype              = PLPGSQL_DTYPE_ARRAYELEM;
                                                new->subscript  = $3;
                                                new->arrayparentno = $1;
+                                               /* initialize cached type data to "not valid" */
+                                               new->parenttypoid = InvalidOid;
 
                                                plpgsql_adddatum((PLpgSQL_datum *) new);
 
index f0ed762ccc343f6b5e0c6d366475bb95cdd527e0..b1ed3c3167c6120bd6d0d30998c6f006077e9791 100644 (file)
@@ -874,7 +874,8 @@ copy_plpgsql_datum(PLpgSQL_datum *datum)
 
                        /*
                         * These datum records are read-only at runtime, so no need to
-                        * copy them
+                        * copy them (well, ARRAYELEM contains some cached type data,
+                        * but we'd just as soon centralize the caching anyway)
                         */
                        result = datum;
                        break;
@@ -3986,20 +3987,16 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                /*
                                 * Target is an element of an array
                                 */
+                               PLpgSQL_arrayelem *arrayelem;
                                int                     nsubscripts;
                                int                     i;
                                PLpgSQL_expr *subscripts[MAXDIM];
                                int                     subscriptvals[MAXDIM];
-                               bool            oldarrayisnull;
-                               Oid                     arraytypeid,
-                                                       arrayelemtypeid;
-                               int32           arraytypmod;
-                               int16           arraytyplen,
-                                                       elemtyplen;
-                               bool            elemtypbyval;
-                               char            elemtypalign;
                                Datum           oldarraydatum,
                                                        coerced_value;
+                               bool            oldarrayisnull;
+                               Oid                     parenttypoid;
+                               int32           parenttypmod;
                                ArrayType  *oldarrayval;
                                ArrayType  *newarrayval;
                                SPITupleTable *save_eval_tuptable;
@@ -4020,13 +4017,14 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                 * back to find the base array datum, and save the subscript
                                 * expressions as we go.  (We are scanning right to left here,
                                 * but want to evaluate the subscripts left-to-right to
-                                * minimize surprises.)
+                                * minimize surprises.)  Note that arrayelem is left pointing
+                                * to the leftmost arrayelem datum, where we will cache the
+                                * array element type data.
                                 */
                                nsubscripts = 0;
                                do
                                {
-                                       PLpgSQL_arrayelem *arrayelem = (PLpgSQL_arrayelem *) target;
-
+                                       arrayelem = (PLpgSQL_arrayelem *) target;
                                        if (nsubscripts >= MAXDIM)
                                                ereport(ERROR,
                                                                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
@@ -4038,24 +4036,51 @@ exec_assign_value(PLpgSQL_execstate *estate,
 
                                /* Fetch current value of array datum */
                                exec_eval_datum(estate, target,
-                                                               &arraytypeid, &arraytypmod,
+                                                               &parenttypoid, &parenttypmod,
                                                                &oldarraydatum, &oldarrayisnull);
 
-                               /* If target is domain over array, reduce to base type */
-                               arraytypeid = getBaseTypeAndTypmod(arraytypeid, &arraytypmod);
-
-                               /* ... and identify the element type */
-                               arrayelemtypeid = get_element_type(arraytypeid);
-                               if (!OidIsValid(arrayelemtypeid))
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
-                                                        errmsg("subscripted object is not an array")));
-
-                               get_typlenbyvalalign(arrayelemtypeid,
-                                                                        &elemtyplen,
-                                                                        &elemtypbyval,
-                                                                        &elemtypalign);
-                               arraytyplen = get_typlen(arraytypeid);
+                               /* Update cached type data if necessary */
+                               if (arrayelem->parenttypoid != parenttypoid ||
+                                       arrayelem->parenttypmod != parenttypmod)
+                               {
+                                       Oid                     arraytypoid;
+                                       int32           arraytypmod = parenttypmod;
+                                       int16           arraytyplen;
+                                       Oid                     elemtypoid;
+                                       int16           elemtyplen;
+                                       bool            elemtypbyval;
+                                       char            elemtypalign;
+
+                                       /* If target is domain over array, reduce to base type */
+                                       arraytypoid = getBaseTypeAndTypmod(parenttypoid,
+                                                                                                          &arraytypmod);
+
+                                       /* ... and identify the element type */
+                                       elemtypoid = get_element_type(arraytypoid);
+                                       if (!OidIsValid(elemtypoid))
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                                                errmsg("subscripted object is not an array")));
+
+                                       /* Collect needed data about the types */
+                                       arraytyplen = get_typlen(arraytypoid);
+
+                                       get_typlenbyvalalign(elemtypoid,
+                                                                                &elemtyplen,
+                                                                                &elemtypbyval,
+                                                                                &elemtypalign);
+
+                                       /* Now safe to update the cached data */
+                                       arrayelem->parenttypoid = parenttypoid;
+                                       arrayelem->parenttypmod = parenttypmod;
+                                       arrayelem->arraytypoid = arraytypoid;
+                                       arrayelem->arraytypmod = arraytypmod;
+                                       arrayelem->arraytyplen = arraytyplen;
+                                       arrayelem->elemtypoid = elemtypoid;
+                                       arrayelem->elemtyplen = elemtyplen;
+                                       arrayelem->elemtypbyval = elemtypbyval;
+                                       arrayelem->elemtypalign = elemtypalign;
+                               }
 
                                /*
                                 * Evaluate the subscripts, switch into left-to-right order.
@@ -4093,8 +4118,8 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                /* Coerce source value to match array element type. */
                                coerced_value = exec_simple_cast_value(value,
                                                                                                           valtype,
-                                                                                                          arrayelemtypeid,
-                                                                                                          arraytypmod,
+                                                                                                          arrayelem->elemtypoid,
+                                                                                                          arrayelem->arraytypmod,
                                                                                                           *isNull);
 
                                /*
@@ -4107,12 +4132,12 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                 * array, either, so that's a no-op too.  This is all ugly but
                                 * corresponds to the current behavior of ExecEvalArrayRef().
                                 */
-                               if (arraytyplen > 0 &&  /* fixed-length array? */
+                               if (arrayelem->arraytyplen > 0 &&       /* fixed-length array? */
                                        (oldarrayisnull || *isNull))
                                        return;
 
                                if (oldarrayisnull)
-                                       oldarrayval = construct_empty_array(arrayelemtypeid);
+                                       oldarrayval = construct_empty_array(arrayelem->elemtypoid);
                                else
                                        oldarrayval = (ArrayType *) DatumGetPointer(oldarraydatum);
 
@@ -4124,16 +4149,17 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                                                                subscriptvals,
                                                                                coerced_value,
                                                                                *isNull,
-                                                                               arraytyplen,
-                                                                               elemtyplen,
-                                                                               elemtypbyval,
-                                                                               elemtypalign);
+                                                                               arrayelem->arraytyplen,
+                                                                               arrayelem->elemtyplen,
+                                                                               arrayelem->elemtypbyval,
+                                                                               arrayelem->elemtypalign);
 
                                /*
                                 * Avoid leaking the result of exec_simple_cast_value, if it
                                 * performed a conversion to a pass-by-ref type.
                                 */
-                               if (!*isNull && coerced_value != value && !elemtypbyval)
+                               if (!*isNull && coerced_value != value &&
+                                       !arrayelem->elemtypbyval)
                                        pfree(DatumGetPointer(coerced_value));
 
                                /*
@@ -4145,7 +4171,7 @@ exec_assign_value(PLpgSQL_execstate *estate,
                                *isNull = false;
                                exec_assign_value(estate, target,
                                                                  PointerGetDatum(newarrayval),
-                                                                 arraytypeid, isNull);
+                                                                 arrayelem->arraytypoid, isNull);
 
                                /*
                                 * Avoid leaking the modified array value, too.
index ed8fcadb20730b86f0216630088571f9f25c6950..61503f10a786b18765893185721e12ff2587e8b8 100644 (file)
@@ -299,6 +299,16 @@ typedef struct
        int                     dno;
        PLpgSQL_expr *subscript;
        int                     arrayparentno;  /* dno of parent array variable */
+       /* Remaining fields are cached info about the array variable's type */
+       Oid                     parenttypoid;   /* type of array variable; 0 if not yet set */
+       int32           parenttypmod;   /* typmod of array variable */
+       Oid                     arraytypoid;    /* OID of actual array type */
+       int32           arraytypmod;    /* typmod of array (and its elements too) */
+       int16           arraytyplen;    /* typlen of array type */
+       Oid                     elemtypoid;             /* OID of array element type */
+       int16           elemtyplen;             /* typlen of element type */
+       bool            elemtypbyval;   /* element type is pass-by-value? */
+       char            elemtypalign;   /* typalign of element type */
 } PLpgSQL_arrayelem;
 
 
index bed34c8a87ca97fa0be61c4bfc4021285f02a4cd..238bf5f0aec5e85e6af8598e4174797116644e25 100644 (file)
@@ -4509,3 +4509,65 @@ NOTICE:  {"(35,78)","(88,76)"}
 
 drop function foreach_test(anyarray);
 drop type xy_tuple;
+--
+-- Assorted tests for array subscript assignment
+--
+create temp table rtype (id int, ar text[]);
+create function arrayassign1() returns text[] language plpgsql as $$
+declare
+ r record;
+begin
+  r := row(12, '{foo,bar,baz}')::rtype;
+  r.ar[2] := 'replace';
+  return r.ar;
+end$$;
+select arrayassign1();
+   arrayassign1    
+-------------------
+ {foo,replace,baz}
+(1 row)
+
+select arrayassign1(); -- try again to exercise internal caching
+   arrayassign1    
+-------------------
+ {foo,replace,baz}
+(1 row)
+
+create domain orderedarray as int[2]
+  constraint sorted check (value[1] < value[2]);
+select '{1,2}'::orderedarray;
+ orderedarray 
+--------------
+ {1,2}
+(1 row)
+
+select '{2,1}'::orderedarray;  -- fail
+ERROR:  value for domain orderedarray violates check constraint "sorted"
+create function testoa(x1 int, x2 int, x3 int) returns orderedarray
+language plpgsql as $$
+declare res orderedarray;
+begin
+  res := array[x1, x2];
+  res[2] := x3;
+  return res;
+end$$;
+select testoa(1,2,3);
+ testoa 
+--------
+ {1,3}
+(1 row)
+
+select testoa(1,2,3); -- try again to exercise internal caching
+ testoa 
+--------
+ {1,3}
+(1 row)
+
+select testoa(2,1,3); -- fail at initial assign
+ERROR:  value for domain orderedarray violates check constraint "sorted"
+CONTEXT:  PL/pgSQL function "testoa" line 4 at assignment
+select testoa(1,2,1); -- fail at update
+ERROR:  value for domain orderedarray violates check constraint "sorted"
+CONTEXT:  PL/pgSQL function "testoa" line 5 at assignment
+drop function arrayassign1();
+drop function testoa(x1 int, x2 int, x3 int);
index 05f031575c02490b5afa048c405b6fc51b382bf8..b47c2de312a102a1342f54c5101be95edfa4d3b2 100644 (file)
@@ -3559,3 +3559,44 @@ select foreach_test(ARRAY[[(10,20),(40,69)],[(35,78),(88,76)]]::xy_tuple[]);
 
 drop function foreach_test(anyarray);
 drop type xy_tuple;
+
+--
+-- Assorted tests for array subscript assignment
+--
+
+create temp table rtype (id int, ar text[]);
+
+create function arrayassign1() returns text[] language plpgsql as $$
+declare
+ r record;
+begin
+  r := row(12, '{foo,bar,baz}')::rtype;
+  r.ar[2] := 'replace';
+  return r.ar;
+end$$;
+
+select arrayassign1();
+select arrayassign1(); -- try again to exercise internal caching
+
+create domain orderedarray as int[2]
+  constraint sorted check (value[1] < value[2]);
+
+select '{1,2}'::orderedarray;
+select '{2,1}'::orderedarray;  -- fail
+
+create function testoa(x1 int, x2 int, x3 int) returns orderedarray
+language plpgsql as $$
+declare res orderedarray;
+begin
+  res := array[x1, x2];
+  res[2] := x3;
+  return res;
+end$$;
+
+select testoa(1,2,3);
+select testoa(1,2,3); -- try again to exercise internal caching
+select testoa(2,1,3); -- fail at initial assign
+select testoa(1,2,1); -- fail at update
+
+drop function arrayassign1();
+drop function testoa(x1 int, x2 int, x3 int);