During ALTER TABLE ADD FOREIGN KEY, try to check the existing rows using
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 6 Oct 2003 16:38:28 +0000 (16:38 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 6 Oct 2003 16:38:28 +0000 (16:38 +0000)
a single LEFT JOIN query instead of firing the check trigger for each
row individually.  Stephan Szabo, with some kibitzing from Tom Lane and
Jan Wieck.

src/backend/commands/tablecmds.c
src/backend/utils/adt/ri_triggers.c
src/include/commands/trigger.h

index 3ece4f80f9586f275a95dc7f6aec9956d02bab31..395077081a7f595385ab311b23af33b664511f18 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.85 2003/10/02 06:36:37 petere Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.86 2003/10/06 16:38:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -3454,6 +3454,13 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint,
    List       *list;
    int         count;
 
+   /*
+    * See if we can do it with a single LEFT JOIN query.  A FALSE result
+    * indicates we must proceed with the fire-the-trigger method.
+    */
+   if (RI_Initial_Check(fkconstraint, rel, pkrel))
+       return;
+
    /*
     * Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
     * as if that tuple had just been inserted.  If any of those fail, it
index 11b7e84df03767bcc12556163bb3c5d99312a67e..306dda0aa4ebde4e4e0dd10929d2793ea81df8c2 100644 (file)
@@ -17,7 +17,7 @@
  *
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  *
- * $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.61 2003/10/01 21:30:52 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.62 2003/10/06 16:38:28 tgl Exp $
  *
  * ----------
  */
@@ -40,6 +40,7 @@
 #include "rewrite/rewriteHandler.h"
 #include "utils/lsyscache.h"
 #include "utils/typcache.h"
+#include "utils/acl.h"
 #include "miscadmin.h"
 
 
@@ -164,7 +165,8 @@ static void ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
                 Datum *vals, char *nulls);
 static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                   Relation pk_rel, Relation fk_rel,
-                  HeapTuple violator, bool spi_err);
+                  HeapTuple violator, TupleDesc tupdesc,
+                  bool spi_err);
 
 
 /* ----------
@@ -2540,7 +2542,205 @@ RI_FKey_keyequal_upd(TriggerData *trigdata)
 }
 
 
+/* ----------
+ * RI_Initial_Check -
+ *
+ * Check an entire table for non-matching values using a single query.
+ * This is not a trigger procedure, but is called during ALTER TABLE
+ * ADD FOREIGN KEY to validate the initial table contents.
+ *
+ * We expect that an exclusive lock has been taken on rel and pkrel;
+ * hence, we do not need to lock individual rows for the check.
+ *
+ * If the check fails because the current user doesn't have permissions
+ * to read both tables, return false to let our caller know that they will
+ * need to do something else to check the constraint.
+ * ----------
+ */
+bool
+RI_Initial_Check(FkConstraint *fkconstraint, Relation rel, Relation pkrel)
+{
+   const char *constrname = fkconstraint->constr_name;
+   char        querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 +
+                       (MAX_QUOTED_NAME_LEN + 32) * ((RI_MAX_NUMKEYS * 4)+1)];
+   char        pkrelname[MAX_QUOTED_REL_NAME_LEN];
+   char        relname[MAX_QUOTED_REL_NAME_LEN];
+   char        attname[MAX_QUOTED_NAME_LEN];
+   char        fkattname[MAX_QUOTED_NAME_LEN];
+   const char *sep;
+   List        *list;
+   List        *list2;
+   int         spi_result;
+   void        *qplan;
+
+   /*
+    * Check to make sure current user has enough permissions to do the
+    * test query.  (If not, caller can fall back to the trigger method,
+    * which works because it changes user IDs on the fly.)
+    *
+    * XXX are there any other show-stopper conditions to check?
+    */
+   if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
+       return false;
+   if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != ACLCHECK_OK) 
+       return false;
+
+   /*----------
+    * The query string built is:
+    *  SELECT fk.keycols FROM ONLY relname fk 
+    *   LEFT OUTER JOIN ONLY pkrelname pk 
+    *   ON (pk.pkkeycol1=fk.keycol1 [AND ...])
+    *   WHERE pk.pkkeycol1 IS NULL AND
+    * For MATCH unspecified:
+    *   (fk.keycol1 IS NOT NULL [AND ...])
+    * For MATCH FULL:
+    *   (fk.keycol1 IS NOT NULL [OR ...])
+    *----------
+    */
+
+   sprintf(querystr, "SELECT ");
+   sep="";
+   foreach(list, fkconstraint->fk_attrs)
+   {
+       quoteOneName(attname, strVal(lfirst(list)));
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                "%sfk.%s", sep, attname);
+       sep = ", ";
+   }
+
+   quoteRelationName(pkrelname, pkrel);
+   quoteRelationName(relname, rel);
+   snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+            " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON (",
+            relname, pkrelname);
+
+   sep="";
+   for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs; 
+        list != NIL && list2 != NIL; 
+        list=lnext(list), list2=lnext(list2))
+   {
+       quoteOneName(attname, strVal(lfirst(list)));
+       quoteOneName(fkattname, strVal(lfirst(list2)));
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                "%spk.%s=fk.%s",
+                sep, attname, fkattname);
+       sep = " AND ";
+   }
+   /*
+    * It's sufficient to test any one pk attribute for null to detect a
+    * join failure.
+    */
+   quoteOneName(attname, strVal(lfirst(fkconstraint->pk_attrs)));
+   snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+            ") WHERE pk.%s IS NULL AND (", attname);
+
+   sep="";
+   foreach(list, fkconstraint->fk_attrs)
+   {
+       quoteOneName(attname, strVal(lfirst(list)));
+       snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+                "%sfk.%s IS NOT NULL",
+                sep, attname);
+       switch (fkconstraint->fk_matchtype)
+       {
+           case FKCONSTR_MATCH_UNSPECIFIED:
+               sep=" AND ";
+               break;
+           case FKCONSTR_MATCH_FULL:
+               sep=" OR ";
+               break;
+           case FKCONSTR_MATCH_PARTIAL:
+               ereport(ERROR,
+                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                        errmsg("MATCH PARTIAL not yet implemented")));
+               break;
+           default:
+               elog(ERROR, "unrecognized match type: %d",
+                    fkconstraint->fk_matchtype);
+               break;
+       }
+   }
+   snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
+            ")");
+
+   if (SPI_connect() != SPI_OK_CONNECT)
+       elog(ERROR, "SPI_connect failed");
 
+   /*
+    * Generate the plan.  We don't need to cache it, and there are no
+    * arguments to the plan. 
+    */
+   qplan = SPI_prepare(querystr, 0, NULL);
+
+   if (qplan == NULL)
+       elog(ERROR, "SPI_prepare returned %d for %s", SPI_result, querystr);
+
+   /*
+    * Run the plan.  For safety we force a current query snapshot to be
+    * used.  (In serializable mode, this arguably violates serializability,
+    * but we really haven't got much choice.)  We need at most one tuple
+    * returned, so pass limit = 1.
+    */
+   spi_result = SPI_execp_current(qplan, NULL, NULL, true, 1);
+
+   /* Check result */
+   if (spi_result != SPI_OK_SELECT)
+       elog(ERROR, "SPI_execp_current returned %d", spi_result);
+
+   /* Did we find a tuple violating the constraint? */
+   if (SPI_processed > 0)
+   {
+       HeapTuple   tuple = SPI_tuptable->vals[0];
+       TupleDesc   tupdesc = SPI_tuptable->tupdesc;
+       int         nkeys = length(fkconstraint->fk_attrs);
+       int         i;
+       RI_QueryKey qkey;
+
+       /*
+        * If it's MATCH FULL, and there are any nulls in the FK keys,
+        * complain about that rather than the lack of a match.  MATCH FULL
+        * disallows partially-null FK rows.
+        */
+       if (fkconstraint->fk_matchtype == FKCONSTR_MATCH_FULL)
+       {
+           bool    isnull = false;
+
+           for (i = 1; i <= nkeys; i++)
+           {
+               (void) SPI_getbinval(tuple, tupdesc, i, &isnull);
+               if (isnull)
+                   break;
+           }
+           if (isnull)
+               ereport(ERROR,
+                       (errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
+                        errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
+                               RelationGetRelationName(rel),
+                               constrname),
+                        errdetail("MATCH FULL does not allow mixing of null and nonnull key values.")));
+       }
+
+       /*
+        * Although we didn't cache the query, we need to set up a fake
+        * query key to pass to ri_ReportViolation.
+        */
+       MemSet(&qkey, 0, sizeof(qkey));
+       qkey.constr_queryno = RI_PLAN_CHECK_LOOKUPPK;
+       qkey.nkeypairs = nkeys;
+       for (i = 0; i < nkeys; i++)
+           qkey.keypair[i][RI_KEYPAIR_FK_IDX] = i + 1;
+
+       ri_ReportViolation(&qkey, constrname,
+                          pkrel, rel,
+                          tuple, tupdesc,
+                          false);
+   }
+
+   if (SPI_finish() != SPI_OK_FINISH)
+       elog(ERROR, "SPI_finish failed");
+
+   return true;
+}
 
 
 /* ----------
@@ -2782,6 +2982,9 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
    /* Create the plan */
    qplan = SPI_prepare(querystr, nargs, argtypes);
 
+   if (qplan == NULL)
+       elog(ERROR, "SPI_prepare returned %d for %s", SPI_result, querystr);
+
    /* Restore UID */
    SetUserId(save_uid);
 
@@ -2905,6 +3108,7 @@ ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
        ri_ReportViolation(qkey, constrname ? constrname : "",
                           pk_rel, fk_rel,
                           new_tuple ? new_tuple : old_tuple,
+                          NULL,
                           true);
 
    /* XXX wouldn't it be clearer to do this part at the caller? */
@@ -2913,6 +3117,7 @@ ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
        ri_ReportViolation(qkey, constrname,
                           pk_rel, fk_rel,
                           new_tuple ? new_tuple : old_tuple,
+                          NULL,
                           false);
 
    return SPI_processed != 0;
@@ -2950,7 +3155,8 @@ ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
 static void
 ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                   Relation pk_rel, Relation fk_rel,
-                  HeapTuple violator, bool spi_err)
+                  HeapTuple violator, TupleDesc tupdesc,
+                  bool spi_err)
 {
 #define BUFLENGTH  512
    char        key_names[BUFLENGTH];
@@ -2958,7 +3164,6 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
    char       *name_ptr = key_names;
    char       *val_ptr = key_values;
    bool        onfk;
-   Relation    rel;
    int         idx,
                key_idx;
 
@@ -2972,18 +3177,21 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
                 errhint("This is most likely due to a rule having rewritten the query.")));
 
    /*
-    * rel is set to where the tuple description is coming from.
+    * Determine which relation to complain about.  If tupdesc wasn't
+    * passed by caller, assume the violator tuple came from there.
     */
    onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
    if (onfk)
    {
-       rel = fk_rel;
        key_idx = RI_KEYPAIR_FK_IDX;
+       if (tupdesc == NULL)
+           tupdesc = fk_rel->rd_att;
    }
    else
    {
-       rel = pk_rel;
        key_idx = RI_KEYPAIR_PK_IDX;
+       if (tupdesc == NULL)
+           tupdesc = pk_rel->rd_att;
    }
 
    /*
@@ -3008,8 +3216,8 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
        char       *name,
                   *val;
 
-       name = SPI_fname(rel->rd_att, fnum);
-       val = SPI_getvalue(violator, rel->rd_att, fnum);
+       name = SPI_fname(tupdesc, fnum);
+       val = SPI_getvalue(violator, tupdesc, fnum);
        if (!val)
            val = "null";
 
index d3aa9e385af935cc6cebdba53e154c70be1fde44..fe8f6af61d9fef5d5b9d12e4a1234e083eebdb38 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: trigger.h,v 1.43 2003/08/04 02:40:13 momjian Exp $
+ * $Id: trigger.h,v 1.44 2003/10/06 16:38:28 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -197,5 +197,8 @@ extern void DeferredTriggerSetState(ConstraintsSetStmt *stmt);
  * in utils/adt/ri_triggers.c
  */
 extern bool RI_FKey_keyequal_upd(TriggerData *trigdata);
+extern bool RI_Initial_Check(FkConstraint *fkconstraint, 
+                            Relation rel, 
+                            Relation pkrel);
 
 #endif   /* TRIGGER_H */