Make the world safe (more or less) for dropped columns in plpgsql rowtypes.
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 25 Sep 2003 23:02:12 +0000 (23:02 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 25 Sep 2003 23:02:12 +0000 (23:02 +0000)
src/backend/executor/execQual.c
src/backend/executor/nodeFunctionscan.c
src/pl/plpgsql/src/gram.y
src/pl/plpgsql/src/pl_comp.c
src/pl/plpgsql/src/pl_exec.c
src/pl/plpgsql/src/pl_funcs.c
src/pl/plpgsql/src/plpgsql.h

index 923d418b3a9dab679229e5efa43f44997c09b2d5..74560b1ba1dd82f2d0a5cd3f3d5ec85c73fdc9ce 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/executor/execQual.c,v 1.145 2003/09/25 06:57:59 petere Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/executor/execQual.c,v 1.146 2003/09/25 23:02:11 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -315,23 +315,6 @@ ExecEvalAggref(AggrefExprState *aggref, ExprContext *econtext, bool *isNull)
  *
  *     Returns a Datum whose value is the value of a range
  *     variable with respect to given expression context.
- *
- *
- *     As an entry condition, we expect that the datatype the
- *     plan expects to get (as told by our "variable" argument) is in
- *     fact the datatype of the attribute the plan says to fetch (as
- *     seen in the current context, identified by our "econtext"
- *     argument).
- *
- *     If we fetch a Type A attribute and Caller treats it as if it
- *     were Type B, there will be undefined results (e.g. crash).
- *     One way these might mismatch now is that we're accessing a
- *     catalog class and the type information in the pg_attribute
- *     class does not match the hardcoded pg_attribute information
- *     (in pg_attribute.h) for the class in question.
- *
- *     We have an Assert to make sure this entry condition is met.
- *
  * ---------------------------------------------------------------- */
 static Datum
 ExecEvalVar(Var *variable, ExprContext *econtext, bool *isNull)
@@ -369,11 +352,40 @@ ExecEvalVar(Var *variable, ExprContext *econtext, bool *isNull)
 
    attnum = variable->varattno;
 
-   /* (See prolog for explanation of this Assert) */
-   Assert(attnum <= 0 ||
-          (attnum - 1 <= tuple_type->natts - 1 &&
-           tuple_type->attrs[attnum - 1] != NULL &&
-         variable->vartype == tuple_type->attrs[attnum - 1]->atttypid));
+   /*
+    * Some checks that are only applied for user attribute numbers
+    * (bogus system attnums will be caught inside heap_getattr).
+    */
+   if (attnum > 0)
+   {
+       /*
+        * This assert checks that the attnum is valid.
+        */
+       Assert(attnum <= tuple_type->natts &&
+              tuple_type->attrs[attnum - 1] != NULL);
+
+       /*
+        * If the attribute's column has been dropped, we force a NULL result.
+        * This case should not happen in normal use, but it could happen if
+        * we are executing a plan cached before the column was dropped.
+        */
+       if (tuple_type->attrs[attnum - 1]->attisdropped)
+       {
+           *isNull = true;
+           return (Datum) 0;
+       }
+
+       /*
+        * This assert checks that the datatype the plan expects to get (as
+        * told by our "variable" argument) is in fact the datatype of the
+        * attribute being fetched (as seen in the current context, identified
+        * by our "econtext" argument).  Otherwise crashes are likely.
+        *
+        * Note that we can't check dropped columns, since their atttypid
+        * has been zeroed.
+        */
+       Assert(variable->vartype == tuple_type->attrs[attnum - 1]->atttypid);
+   }
 
    /*
     * If the attribute number is invalid, then we are supposed to return
index ca25774805b491ef1a2f9b0a7e8209515cfea051..a47e37d7e5574440d934fa19ed08309ee7abaa68 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/executor/nodeFunctionscan.c,v 1.21 2003/09/25 06:57:59 petere Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/executor/nodeFunctionscan.c,v 1.22 2003/09/25 23:02:12 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -35,7 +35,7 @@
 
 
 static TupleTableSlot *FunctionNext(FunctionScanState *node);
-static bool tupledesc_mismatch(TupleDesc tupdesc1, TupleDesc tupdesc2);
+static bool tupledesc_match(TupleDesc dst_tupdesc, TupleDesc src_tupdesc);
 
 /* ----------------------------------------------------------------
  *                     Scan Support
@@ -86,8 +86,7 @@ FunctionNext(FunctionScanState *node)
         * need to do this for functions returning RECORD, but might as
         * well do it always.
         */
-       if (funcTupdesc &&
-           tupledesc_mismatch(node->tupdesc, funcTupdesc))
+       if (funcTupdesc && !tupledesc_match(node->tupdesc, funcTupdesc))
            ereport(ERROR,
                    (errcode(ERRCODE_DATATYPE_MISMATCH),
                     errmsg("query-specified return row and actual function return row do not match")));
@@ -364,26 +363,36 @@ ExecFunctionReScan(FunctionScanState *node, ExprContext *exprCtxt)
        tuplestore_rescan(node->tuplestorestate);
 }
 
-
+/*
+ * Check that function result tuple type (src_tupdesc) matches or can be
+ * considered to match what the query expects (dst_tupdesc).
+ *
+ * We really only care about number of attributes and data type.
+ * Also, we can ignore type mismatch on columns that are dropped in the
+ * destination type, so long as the physical storage matches.  This is
+ * helpful in some cases involving out-of-date cached plans.
+ */
 static bool
-tupledesc_mismatch(TupleDesc tupdesc1, TupleDesc tupdesc2)
+tupledesc_match(TupleDesc dst_tupdesc, TupleDesc src_tupdesc)
 {
    int         i;
 
-   if (tupdesc1->natts != tupdesc2->natts)
-       return true;
+   if (dst_tupdesc->natts != src_tupdesc->natts)
+       return false;
 
-   for (i = 0; i < tupdesc1->natts; i++)
+   for (i = 0; i < dst_tupdesc->natts; i++)
    {
-       Form_pg_attribute attr1 = tupdesc1->attrs[i];
-       Form_pg_attribute attr2 = tupdesc2->attrs[i];
-
-       /*
-        * We really only care about number of attributes and data type
-        */
-       if (attr1->atttypid != attr2->atttypid)
-           return true;
+       Form_pg_attribute dattr = dst_tupdesc->attrs[i];
+       Form_pg_attribute sattr = src_tupdesc->attrs[i];
+
+       if (dattr->atttypid == sattr->atttypid)
+           continue;           /* no worries */
+       if (!dattr->attisdropped)
+           return false;
+       if (dattr->attlen != sattr->attlen ||
+           dattr->attalign != sattr->attalign)
+           return false;
    }
 
-   return false;
+   return true;
 }
index 97fb17f1b6cd380f1addb3c27d25f8ae8bb66004..a1408ab92ecc91473d0a977df51e1d562c7da104 100644 (file)
@@ -4,7 +4,7 @@
  *                       procedural language
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/gram.y,v 1.46 2003/07/27 21:49:54 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/gram.y,v 1.47 2003/09/25 23:02:12 tgl Exp $
  *
  *   This software is copyrighted by Jan Wieck - Hamburg.
  *
@@ -47,7 +47,6 @@ static    PLpgSQL_expr    *read_sql_stmt(const char *sqlstart);
 static PLpgSQL_type    *read_datatype(int tok);
 static PLpgSQL_stmt    *make_select_stmt(void);
 static PLpgSQL_stmt    *make_fetch_stmt(void);
-static PLpgSQL_expr    *make_tupret_expr(PLpgSQL_row *row);
 static void check_assignable(PLpgSQL_datum *datum);
 
 %}
@@ -493,7 +492,7 @@ decl_cursor_arglist : decl_cursor_arg
                        new->dtype = PLPGSQL_DTYPE_ROW;
                        new->refname = strdup("*internal*");
                        new->lineno = plpgsql_scanner_lineno();
-                       new->rowtypeclass = InvalidOid;
+                       new->rowtupdesc = NULL;
                        /*
                         * We make temporary fieldnames/varnos arrays that
                         * are much bigger than necessary.  We will resize
@@ -1176,24 +1175,24 @@ stmt_return     : K_RETURN lno
 
                        new = malloc(sizeof(PLpgSQL_stmt_return));
                        memset(new, 0, sizeof(PLpgSQL_stmt_return));
+                       new->expr = NULL;
+                       new->retrecno   = -1;
+                       new->retrowno   = -1;
 
                        if (plpgsql_curr_compile->fn_retistuple &&
                            !plpgsql_curr_compile->fn_retset)
                        {
-                           new->retrecno   = -1;
                            switch (yylex())
                            {
                                case K_NULL:
-                                   new->expr = NULL;
                                    break;
 
                                case T_ROW:
-                                   new->expr = make_tupret_expr(yylval.row);
+                                   new->retrowno = yylval.row->rowno;
                                    break;
 
                                case T_RECORD:
                                    new->retrecno = yylval.rec->recno;
-                                   new->expr = NULL;
                                    break;
 
                                default:
@@ -1874,7 +1873,7 @@ make_select_stmt(void)
                    row->dtype = PLPGSQL_DTYPE_ROW;
                    row->refname = strdup("*internal*");
                    row->lineno = plpgsql_scanner_lineno();
-                   row->rowtypeclass = InvalidOid;
+                   row->rowtupdesc = NULL;
                    row->nfields = nfields;
                    row->fieldnames = malloc(sizeof(char *) * nfields);
                    row->varnos = malloc(sizeof(int) * nfields);
@@ -2007,7 +2006,7 @@ make_fetch_stmt(void)
                row->dtype = PLPGSQL_DTYPE_ROW;
                row->refname = strdup("*internal*");
                row->lineno = plpgsql_scanner_lineno();
-               row->rowtypeclass = InvalidOid;
+               row->rowtupdesc = NULL;
                row->nfields = nfields;
                row->fieldnames = malloc(sizeof(char *) * nfields);
                row->varnos = malloc(sizeof(int) * nfields);
@@ -2041,36 +2040,6 @@ make_fetch_stmt(void)
 }
 
 
-static PLpgSQL_expr *
-make_tupret_expr(PLpgSQL_row *row)
-{
-   PLpgSQL_dstring     ds;
-   PLpgSQL_expr        *expr;
-   int                 i;
-   char                buf[16];
-
-   expr = malloc(sizeof(PLpgSQL_expr) + sizeof(int) * (row->nfields - 1));
-   expr->dtype         = PLPGSQL_DTYPE_EXPR;
-
-   plpgsql_dstring_init(&ds);
-   plpgsql_dstring_append(&ds, "SELECT ");
-
-   for (i = 0; i < row->nfields; i++)
-   {
-       sprintf(buf, "%s$%d", (i > 0) ? "," : "", i + 1);
-       plpgsql_dstring_append(&ds, buf);
-       expr->params[i] = row->varnos[i];
-   }
-
-   expr->query         = strdup(plpgsql_dstring_get(&ds));
-   expr->plan          = NULL;
-   expr->plan_argtypes = NULL;
-   expr->nparams       = row->nfields;
-
-   plpgsql_dstring_free(&ds);
-   return expr;
-}
-
 static void
 check_assignable(PLpgSQL_datum *datum)
 {
index 84685c2103447f3d967606aa1ee2bcc9383b59e3..7c9011ab7e2352826790c197d3ccfd392c4cfd25 100644 (file)
@@ -3,7 +3,7 @@
  *           procedural language
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_comp.c,v 1.67 2003/08/18 19:16:02 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_comp.c,v 1.68 2003/09/25 23:02:12 tgl Exp $
  *
  *   This software is copyrighted by Jan Wieck - Hamburg.
  *
@@ -899,7 +899,8 @@ plpgsql_parse_dblword(char *word)
                row = (PLpgSQL_row *) (plpgsql_Datums[ns->itemno]);
                for (i = 0; i < row->nfields; i++)
                {
-                   if (strcmp(row->fieldnames[i], cp[1]) == 0)
+                   if (row->fieldnames[i] &&
+                       strcmp(row->fieldnames[i], cp[1]) == 0)
                    {
                        plpgsql_yylval.var = (PLpgSQL_var *) (plpgsql_Datums[row->varnos[i]]);
                        pfree(cp[0]);
@@ -1005,7 +1006,8 @@ plpgsql_parse_tripword(char *word)
                row = (PLpgSQL_row *) (plpgsql_Datums[ns->itemno]);
                for (i = 0; i < row->nfields; i++)
                {
-                   if (strcmp(row->fieldnames[i], cp[2]) == 0)
+                   if (row->fieldnames[i] &&
+                       strcmp(row->fieldnames[i], cp[2]) == 0)
                    {
                        plpgsql_yylval.var = (PLpgSQL_var *) (plpgsql_Datums[row->varnos[i]]);
                        pfree(cp[0]);
@@ -1396,6 +1398,8 @@ plpgsql_parse_wordrowtype(char *word)
     */
    plpgsql_yylval.row = plpgsql_build_rowtype(classOid);
 
+   plpgsql_adddatum((PLpgSQL_datum *) plpgsql_yylval.row);
+
    pfree(cp[0]);
    pfree(cp[1]);
 
@@ -1439,6 +1443,8 @@ plpgsql_parse_dblwordrowtype(char *word)
     */
    plpgsql_yylval.row = plpgsql_build_rowtype(classOid);
 
+   plpgsql_adddatum((PLpgSQL_datum *) plpgsql_yylval.row);
+
    pfree(cp);
 
    return T_ROW;
@@ -1451,23 +1457,20 @@ PLpgSQL_row *
 plpgsql_build_rowtype(Oid classOid)
 {
    PLpgSQL_row *row;
-   HeapTuple   classtup;
+   Relation    rel;
    Form_pg_class classStruct;
    const char *relname;
    int         i;
+   MemoryContext oldcxt;
 
    /*
-    * Fetch the pg_class tuple.
+    * Open the relation to get info.
     */
-   classtup = SearchSysCache(RELOID,
-                             ObjectIdGetDatum(classOid),
-                             0, 0, 0);
-   if (!HeapTupleIsValid(classtup))
-       elog(ERROR, "cache lookup failed for relation %u", classOid);
-   classStruct = (Form_pg_class) GETSTRUCT(classtup);
-   relname = NameStr(classStruct->relname);
+   rel = heap_open(classOid, AccessShareLock);
+   classStruct = RelationGetForm(rel);
+   relname = RelationGetRelationName(rel);
 
-   /* accept relation, sequence, view, or type pg_class entries */
+   /* accept relation, sequence, view, or composite type entries */
    if (classStruct->relkind != RELKIND_RELATION &&
        classStruct->relkind != RELKIND_SEQUENCE &&
        classStruct->relkind != RELKIND_VIEW &&
@@ -1484,78 +1487,88 @@ plpgsql_build_rowtype(Oid classOid)
    memset(row, 0, sizeof(PLpgSQL_row));
 
    row->dtype = PLPGSQL_DTYPE_ROW;
+
+   /*
+    * This is a bit ugly --- need a permanent copy of the rel's tupdesc.
+    * Someday all these mallocs should go away in favor of a per-function
+    * memory context ...
+    */
+   oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+   row->rowtupdesc = CreateTupleDescCopy(RelationGetDescr(rel));
+   MemoryContextSwitchTo(oldcxt);
+
    row->nfields = classStruct->relnatts;
-   row->rowtypeclass = classStruct->reltype;
    row->fieldnames = malloc(sizeof(char *) * row->nfields);
    row->varnos = malloc(sizeof(int) * row->nfields);
 
    for (i = 0; i < row->nfields; i++)
    {
-       HeapTuple   attrtup;
        Form_pg_attribute attrStruct;
-       HeapTuple   typetup;
-       const char *attname;
-       PLpgSQL_var *var;
 
        /*
-        * Get the attribute and it's type
+        * Get the attribute and check for dropped column
         */
-       attrtup = SearchSysCache(ATTNUM,
-                                ObjectIdGetDatum(classOid),
-                                Int16GetDatum(i + 1),
-                                0, 0);
-       if (!HeapTupleIsValid(attrtup))
-           elog(ERROR, "cache lookup failed for attribute %d of relation %u",
-                i + 1, classOid);
-       attrStruct = (Form_pg_attribute) GETSTRUCT(attrtup);
-
-       attname = NameStr(attrStruct->attname);
-
-       typetup = SearchSysCache(TYPEOID,
-                                ObjectIdGetDatum(attrStruct->atttypid),
-                                0, 0, 0);
-       if (!HeapTupleIsValid(typetup))
-           elog(ERROR, "cache lookup failed for type %u",
-                attrStruct->atttypid);
+       attrStruct = RelationGetDescr(rel)->attrs[i];
 
-       /*
-        * Create the internal variable
-        *
-        * We know if the table definitions contain a default value or if the
-        * field is declared in the table as NOT NULL. But it's possible
-        * to create a table field as NOT NULL without a default value and
-        * that would lead to problems later when initializing the
-        * variables due to entering a block at execution time. Thus we
-        * ignore this information for now.
-        */
-       var = malloc(sizeof(PLpgSQL_var));
-       memset(var, 0, sizeof(PLpgSQL_var));
-       var->dtype = PLPGSQL_DTYPE_VAR;
-       var->refname = malloc(strlen(relname) + strlen(attname) + 2);
-       strcpy(var->refname, relname);
-       strcat(var->refname, ".");
-       strcat(var->refname, attname);
-       var->datatype = build_datatype(typetup, attrStruct->atttypmod);
-       var->isconst = false;
-       var->notnull = false;
-       var->default_val = NULL;
-       var->value = (Datum) 0;
-       var->isnull = true;
-       var->freeval = false;
-
-       plpgsql_adddatum((PLpgSQL_datum *) var);
+       if (!attrStruct->attisdropped)
+       {
+           const char *attname;
+           HeapTuple   typetup;
+           PLpgSQL_var *var;
 
-       /*
-        * Add the variable to the row.
-        */
-       row->fieldnames[i] = strdup(attname);
-       row->varnos[i] = var->varno;
+           attname = NameStr(attrStruct->attname);
+
+           typetup = SearchSysCache(TYPEOID,
+                                    ObjectIdGetDatum(attrStruct->atttypid),
+                                    0, 0, 0);
+           if (!HeapTupleIsValid(typetup))
+               elog(ERROR, "cache lookup failed for type %u",
+                    attrStruct->atttypid);
+
+           /*
+            * Create the internal variable for the field
+            *
+            * We know if the table definitions contain a default value or if
+            * the field is declared in the table as NOT NULL. But it's
+            * possible to create a table field as NOT NULL without a default
+            * value and that would lead to problems later when initializing
+            * the variables due to entering a block at execution time. Thus
+            * we ignore this information for now.
+            */
+           var = malloc(sizeof(PLpgSQL_var));
+           MemSet(var, 0, sizeof(PLpgSQL_var));
+           var->dtype = PLPGSQL_DTYPE_VAR;
+           var->refname = malloc(strlen(relname) + strlen(attname) + 2);
+           strcpy(var->refname, relname);
+           strcat(var->refname, ".");
+           strcat(var->refname, attname);
+           var->datatype = build_datatype(typetup, attrStruct->atttypmod);
+           var->isconst = false;
+           var->notnull = false;
+           var->default_val = NULL;
+           var->value = (Datum) 0;
+           var->isnull = true;
+           var->freeval = false;
 
-       ReleaseSysCache(typetup);
-       ReleaseSysCache(attrtup);
+           plpgsql_adddatum((PLpgSQL_datum *) var);
+
+           /*
+            * Add the variable to the row.
+            */
+           row->fieldnames[i] = strdup(attname);
+           row->varnos[i] = var->varno;
+
+           ReleaseSysCache(typetup);
+       }
+       else
+       {
+           /* Leave a hole in the row structure for the dropped col */
+           row->fieldnames[i] = NULL;
+           row->varnos[i] = -1;
+       }
    }
 
-   ReleaseSysCache(classtup);
+   heap_close(rel, AccessShareLock);
 
    return row;
 }
index f51394a2d69e9ac16ab07634ef33a817e358a604..f76dd952ac888b7041c5f4ac54b48d42705a8873 100644 (file)
@@ -3,7 +3,7 @@
  *           procedural language
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_exec.c,v 1.90 2003/08/04 00:43:33 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_exec.c,v 1.91 2003/09/25 23:02:12 tgl Exp $
  *
  *   This software is copyrighted by Jan Wieck - Hamburg.
  *
@@ -148,6 +148,9 @@ static void exec_move_row(PLpgSQL_execstate * estate,
              PLpgSQL_rec * rec,
              PLpgSQL_row * row,
              HeapTuple tup, TupleDesc tupdesc);
+static HeapTuple make_tuple_from_row(PLpgSQL_execstate * estate,
+                                    PLpgSQL_row * row,
+                                    TupleDesc tupdesc);
 static Datum exec_cast_value(Datum value, Oid valtype,
                Oid reqtype,
                FmgrInfo *reqinput,
@@ -1574,6 +1577,22 @@ exec_stmt_return(PLpgSQL_execstate * estate, PLpgSQL_stmt_return * stmt)
            return PLPGSQL_RC_RETURN;
        }
 
+       if (stmt->retrowno >= 0)
+       {
+           PLpgSQL_row *row = (PLpgSQL_row *) (estate->datums[stmt->retrowno]);
+
+           if (row->rowtupdesc) /* should always be true here */
+           {
+               estate->retval = (Datum) make_tuple_from_row(estate, row,
+                                                            row->rowtupdesc);
+               if (estate->retval == (Datum) NULL) /* should not happen */
+                   elog(ERROR, "row not compatible with its own tupdesc");
+               estate->rettupdesc = row->rowtupdesc;
+               estate->retisnull = false;
+           }
+           return PLPGSQL_RC_RETURN;
+       }
+
        if (stmt->expr != NULL)
        {
            exec_run_select(estate, stmt->expr, 1, NULL);
@@ -1650,37 +1669,11 @@ exec_stmt_return_next(PLpgSQL_execstate * estate,
    }
    else if (stmt->row)
    {
-       Datum      *dvalues;
-       char       *nulls;
-       int         i;
-
-       if (natts != stmt->row->nfields)
+       tuple = make_tuple_from_row(estate, stmt->row, tupdesc);
+       if (tuple == NULL)
            ereport(ERROR,
                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                  errmsg("wrong record type supplied in RETURN NEXT")));
-
-       dvalues = (Datum *) palloc0(natts * sizeof(Datum));
-       nulls = (char *) palloc(natts * sizeof(char));
-       MemSet(nulls, 'n', natts);
-
-       for (i = 0; i < natts; i++)
-       {
-           PLpgSQL_var *var;
-
-           var = (PLpgSQL_var *) (estate->datums[stmt->row->varnos[i]]);
-           if (var->datatype->typoid != tupdesc->attrs[i]->atttypid)
-               ereport(ERROR,
-                       (errcode(ERRCODE_DATATYPE_MISMATCH),
-                  errmsg("wrong record type supplied in RETURN NEXT")));
-           dvalues[i] = var->value;
-           if (!var->isnull)
-               nulls[i] = ' ';
-       }
-
-       tuple = heap_formtuple(tupdesc, dvalues, nulls);
-
-       pfree(dvalues);
-       pfree(nulls);
+                    errmsg("wrong record type supplied in RETURN NEXT")));
        free_tuple = true;
    }
    else if (stmt->expr)
@@ -3412,7 +3405,8 @@ exec_move_row(PLpgSQL_execstate * estate,
     * expected if it's from an inheritance-child table of the current
     * table, or it might have fewer if the table has had columns added by
     * ALTER TABLE. Ignore extra columns and assume NULL for missing
-    * columns, the same as heap_getattr would do.
+    * columns, the same as heap_getattr would do.  We also have to skip
+    * over dropped columns in either the source or destination.
     *
     * If we have no tuple data at all, we'll assign NULL to all columns of
     * the row variable.
@@ -3420,25 +3414,35 @@ exec_move_row(PLpgSQL_execstate * estate,
    if (row != NULL)
    {
        int         t_natts;
-       int         i;
+       int         fnum;
+       int         anum;
 
        if (HeapTupleIsValid(tup))
            t_natts = tup->t_data->t_natts;
        else
            t_natts = 0;
 
-       for (i = 0; i < row->nfields; i++)
+       anum = 0;
+       for (fnum = 0; fnum < row->nfields; fnum++)
        {
            PLpgSQL_var *var;
            Datum       value;
            bool        isnull;
            Oid         valtype;
 
-           var = (PLpgSQL_var *) (estate->datums[row->varnos[i]]);
-           if (i < t_natts)
+           if (row->varnos[fnum] < 0)
+               continue;       /* skip dropped column in row struct */
+
+           var = (PLpgSQL_var *) (estate->datums[row->varnos[fnum]]);
+
+           while (anum < t_natts && tupdesc->attrs[anum]->attisdropped)
+               anum++;         /* skip dropped column in tuple */
+
+           if (anum < t_natts)
            {
-               value = SPI_getbinval(tup, tupdesc, i + 1, &isnull);
-               valtype = SPI_gettypeid(tupdesc, i + 1);
+               value = SPI_getbinval(tup, tupdesc, anum + 1, &isnull);
+               valtype = SPI_gettypeid(tupdesc, anum + 1);
+               anum++;
            }
            else
            {
@@ -3447,7 +3451,7 @@ exec_move_row(PLpgSQL_execstate * estate,
                valtype = InvalidOid;
            }
 
-           exec_assign_value(estate, estate->datums[row->varnos[i]],
+           exec_assign_value(estate, (PLpgSQL_datum *) var,
                              value, valtype, &isnull);
        }
 
@@ -3457,6 +3461,54 @@ exec_move_row(PLpgSQL_execstate * estate,
    elog(ERROR, "unsupported target");
 }
 
+/* ----------
+ * make_tuple_from_row     Make a tuple from the values of a row object
+ *
+ * A NULL return indicates rowtype mismatch; caller must raise suitable error
+ * ----------
+ */
+static HeapTuple
+make_tuple_from_row(PLpgSQL_execstate * estate,
+                   PLpgSQL_row * row,
+                   TupleDesc tupdesc)
+{
+   int         natts = tupdesc->natts;
+   HeapTuple   tuple;
+   Datum      *dvalues;
+   char       *nulls;
+   int         i;
+
+   if (natts != row->nfields)
+       return NULL;
+
+   dvalues = (Datum *) palloc0(natts * sizeof(Datum));
+   nulls = (char *) palloc(natts * sizeof(char));
+   MemSet(nulls, 'n', natts);
+
+   for (i = 0; i < natts; i++)
+   {
+       PLpgSQL_var *var;
+
+       if (tupdesc->attrs[i]->attisdropped)
+           continue;       /* leave the column as null */
+       if (row->varnos[i] < 0) /* should not happen */
+           elog(ERROR, "dropped rowtype entry for non-dropped column");
+
+       var = (PLpgSQL_var *) (estate->datums[row->varnos[i]]);
+       if (var->datatype->typoid != tupdesc->attrs[i]->atttypid)
+           return NULL;
+       dvalues[i] = var->value;
+       if (!var->isnull)
+           nulls[i] = ' ';
+   }
+
+   tuple = heap_formtuple(tupdesc, dvalues, nulls);
+
+   pfree(dvalues);
+   pfree(nulls);
+
+   return tuple;
+}
 
 /* ----------
  * exec_cast_value         Cast a value if required
index 74be1447340b6625bf87ebbb102f7a2b7a31020f..c47da263099589a669ca8942b2d0bb346cc48e87 100644 (file)
@@ -3,7 +3,7 @@
  *           procedural language
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_funcs.c,v 1.29 2003/08/04 00:43:33 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/pl_funcs.c,v 1.30 2003/09/25 23:02:12 tgl Exp $
  *
  *   This software is copyrighted by Jan Wieck - Hamburg.
  *
@@ -848,15 +848,14 @@ dump_return(PLpgSQL_stmt_return * stmt)
 {
    dump_ind();
    printf("RETURN ");
-   if (stmt->retrecno > 0)
+   if (stmt->retrecno >= 0)
        printf("record %d", stmt->retrecno);
+   else if (stmt->retrowno >= 0)
+       printf("row %d", stmt->retrowno);
+   else if (stmt->expr == NULL)
+       printf("NULL");
    else
-   {
-       if (stmt->expr == NULL)
-           printf("NULL");
-       else
-           dump_expr(stmt->expr);
-   }
+       dump_expr(stmt->expr);
    printf("\n");
 }
 
@@ -1031,8 +1030,9 @@ plpgsql_dumptree(PLpgSQL_function * func)
                    printf("ROW %-16s fields", row->refname);
                    for (i = 0; i < row->nfields; i++)
                    {
-                       printf(" %s=var %d", row->fieldnames[i],
-                              row->varnos[i]);
+                       if (row->fieldnames[i])
+                           printf(" %s=var %d", row->fieldnames[i],
+                                  row->varnos[i]);
                    }
                    printf("\n");
                }
index 3ec33091e358500d7dfbc893d63caab1eea7f8b1..80e79df30b8b78a037f08b965e8226a9a777af52 100644 (file)
@@ -3,7 +3,7 @@
  *           procedural language
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/plpgsql.h,v 1.40 2003/08/18 19:16:02 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/pl/plpgsql/src/plpgsql.h,v 1.41 2003/09/25 23:02:12 tgl Exp $
  *
  *   This software is copyrighted by Jan Wieck - Hamburg.
  *
@@ -207,8 +207,15 @@ typedef struct
    int         rowno;
    char       *refname;
    int         lineno;
-   Oid         rowtypeclass;
+   TupleDesc   rowtupdesc;
 
+   /*
+    * Note: TupleDesc is only set up for named rowtypes, else it is NULL.
+    *
+    * Note: if the underlying rowtype contains a dropped column, the
+    * corresponding fieldnames[] entry will be NULL, and there is no
+    * corresponding var (varnos[] will be -1).
+    */
    int         nfields;
    char      **fieldnames;
    int        *varnos;
@@ -449,6 +456,7 @@ typedef struct
    int         lineno;
    PLpgSQL_expr *expr;
    int         retrecno;
+   int         retrowno;
 }  PLpgSQL_stmt_return;
 
 typedef struct