Correct attach/detach logic for FKs in partitions
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 12 Oct 2018 15:36:26 +0000 (12:36 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 12 Oct 2018 15:37:37 +0000 (12:37 -0300)
There was no code to handle foreign key constraints on partitioned
tables in the case of ALTER TABLE DETACH; and if you happened to ATTACH
a partition that already had an equivalent constraint, that one was
ignored and a new constraint was created.  Adding this to the fact that
foreign key cloning reuses the constraint name on the partition instead
of generating a new name (as it probably should, to cater to SQL
standard rules about constraint naming within schemas), the result was a
pretty poor user experience -- the most visible failure was that just
detaching a partition and re-attaching it failed with an error such as

  ERROR:  duplicate key value violates unique constraint "pg_constraint_conrelid_contypid_conname_index"
  DETAIL:  Key (conrelid, contypid, conname)=(26702, 0, test_result_asset_id_fkey) already exists.

because it would try to create an identically-named constraint in the
partition.  To make matters worse, if you tried to drop the constraint
in the now-independent partition, that would fail because the constraint
was still seen as dependent on the constraint in its former parent
partitioned table:
  ERROR:  cannot drop inherited constraint "test_result_asset_id_fkey" of relation "test_result_cbsystem_0001_0050_monthly_2018_09"

This fix attacks the problem from two angles: first, when the partition
is detached, the constraint is also marked as independent, so the drop
now works.  Second, when the partition is re-attached, we scan existing
constraints searching for one matching the FK in the parent, and if one
exists, we link that one to the parent constraint.  So we don't end up
with a duplicate -- and better yet, we don't need to scan the referenced
table to verify that the constraint holds.

To implement this I made a small change to previously planner-only
struct ForeignKeyCacheInfo to contain the constraint OID; also relcache
now maintains the list of FKs for partitioned tables too.

Backpatch to 11.

Reported-by: Michael Vitale (bug #15425)
Discussion: https://postgr.es/m/15425-2dbc9d2aa999f816@postgresql.org

src/backend/catalog/pg_constraint.c
src/backend/commands/tablecmds.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/utils/cache/relcache.c
src/include/utils/rel.h
src/test/regress/expected/foreign_key.out
src/test/regress/sql/foreign_key.sql

index 2063abb8aeb6a4c55e83188f05e6198e02ac9850..f4057a9f15276c1d001966b3df72a1ab170002bb 100644 (file)
@@ -19,6 +19,7 @@
 #include "access/htup_details.h"
 #include "access/sysattr.h"
 #include "access/tupconvert.h"
+#include "access/xact.h"
 #include "catalog/dependency.h"
 #include "catalog/indexing.h"
 #include "catalog/objectaccess.h"
 #include "utils/tqual.h"
 
 
+static void clone_fk_constraints(Relation pg_constraint, Relation parentRel,
+                                        Relation partRel, List *clone, List **cloned);
+
+
 /*
  * CreateConstraintEntry
  *     Create a constraint table entry.
@@ -400,34 +405,74 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
        Relation        rel;
        ScanKeyData key;
        SysScanDesc scan;
-       TupleDesc       tupdesc;
        HeapTuple       tuple;
-       AttrNumber *attmap;
+       List       *clone = NIL;
 
        parentRel = heap_open(parentId, NoLock);        /* already got lock */
        /* see ATAddForeignKeyConstraint about lock level */
        rel = heap_open(relationId, AccessExclusiveLock);
-
        pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
+
+       /* Obtain the list of constraints to clone or attach */
+       ScanKeyInit(&key,
+                               Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
+                               F_OIDEQ, ObjectIdGetDatum(parentId));
+       scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
+                                                         NULL, 1, &key);
+       while ((tuple = systable_getnext(scan)) != NULL)
+               clone = lappend_oid(clone, HeapTupleGetOid(tuple));
+       systable_endscan(scan);
+
+       /* Do the actual work, recursing to partitions as needed */
+       clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned);
+
+       /* We're done.  Clean up */
+       heap_close(parentRel, NoLock);
+       heap_close(rel, NoLock);        /* keep lock till commit */
+       heap_close(pg_constraint, RowShareLock);
+}
+
+/*
+ * clone_fk_constraints
+ *             Recursive subroutine for CloneForeignKeyConstraints
+ *
+ * Clone the given list of FK constraints when a partition is attached.
+ *
+ * When cloning foreign keys to a partition, it may happen that equivalent
+ * constraints already exist in the partition for some of them.  We can skip
+ * creating a clone in that case, and instead just attach the existing
+ * constraint to the one in the parent.
+ *
+ * This function recurses to partitions, if the new partition is partitioned;
+ * of course, only do this for FKs that were actually cloned.
+ */
+static void
+clone_fk_constraints(Relation pg_constraint, Relation parentRel,
+                                        Relation partRel, List *clone, List **cloned)
+{
+       TupleDesc       tupdesc;
+       AttrNumber *attmap;
+       List       *partFKs;
+       List       *subclone = NIL;
+       ListCell   *cell;
+
        tupdesc = RelationGetDescr(pg_constraint);
 
        /*
         * The constraint key may differ, if the columns in the partition are
         * different.  This map is used to convert them.
         */
-       attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
+       attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
                                                                                RelationGetDescr(parentRel),
                                                                                gettext_noop("could not convert row type"));
 
-       ScanKeyInit(&key,
-                               Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
-                               F_OIDEQ, ObjectIdGetDatum(parentId));
-       scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
-                                                         NULL, 1, &key);
+       partFKs = copyObject(RelationGetFKeyList(partRel));
 
-       while ((tuple = systable_getnext(scan)) != NULL)
+       foreach(cell, clone)
        {
-               Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+               Oid                     parentConstrOid = lfirst_oid(cell);
+               Form_pg_constraint constrForm;
+               HeapTuple       tuple;
                AttrNumber      conkey[INDEX_MAX_KEYS];
                AttrNumber      mapped_conkey[INDEX_MAX_KEYS];
                AttrNumber      confkey[INDEX_MAX_KEYS];
@@ -435,22 +480,31 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
                Oid                     conppeqop[INDEX_MAX_KEYS];
                Oid                     conffeqop[INDEX_MAX_KEYS];
                Constraint *fkconstraint;
-               ClonedConstraint *newc;
+               bool            attach_it;
                Oid                     constrOid;
                ObjectAddress parentAddr,
                                        childAddr;
                int                     nelem;
+               ListCell   *cell;
                int                     i;
                ArrayType  *arr;
                Datum           datum;
                bool            isnull;
 
+               tuple = SearchSysCache1(CONSTROID, parentConstrOid);
+               if (!tuple)
+                       elog(ERROR, "cache lookup failed for constraint %u",
+                                parentConstrOid);
+               constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+
                /* only foreign keys */
                if (constrForm->contype != CONSTRAINT_FOREIGN)
+               {
+                       ReleaseSysCache(tuple);
                        continue;
+               }
 
-               ObjectAddressSet(parentAddr, ConstraintRelationId,
-                                                HeapTupleGetOid(tuple));
+               ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
 
                datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
                                                        tupdesc, &isnull);
@@ -539,6 +593,90 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
                        elog(ERROR, "conffeqop is not a 1-D OID array");
                memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
 
+               /*
+                * Before creating a new constraint, see whether any existing FKs are
+                * fit for the purpose.  If one is, attach the parent constraint to it,
+                * and don't clone anything.  This way we avoid the expensive
+                * verification step and don't end up with a duplicate FK.  This also
+                * means we don't consider this constraint when recursing to
+                * partitions.
+                */
+               attach_it = false;
+               foreach(cell, partFKs)
+               {
+                       ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
+                       Form_pg_constraint partConstr;
+                       HeapTuple       partcontup;
+
+                       attach_it = true;
+
+                       /*
+                        * Do some quick & easy initial checks.  If any of these fail, we
+                        * cannot use this constraint, but keep looking.
+                        */
+                       if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem)
+                       {
+                               attach_it = false;
+                               continue;
+                       }
+                       for (i = 0; i < nelem; i++)
+                       {
+                               if (fk->conkey[i] != mapped_conkey[i] ||
+                                       fk->confkey[i] != confkey[i] ||
+                                       fk->conpfeqop[i] != conpfeqop[i])
+                               {
+                                       attach_it = false;
+                                       break;
+                               }
+                       }
+                       if (!attach_it)
+                               continue;
+
+                       /*
+                        * Looks good so far; do some more extensive checks.  Presumably
+                        * the check for 'convalidated' could be dropped, since we don't
+                        * really care about that, but let's be careful for now.
+                        */
+                       partcontup = SearchSysCache1(CONSTROID,
+                                                                                ObjectIdGetDatum(fk->conoid));
+                       if (!partcontup)
+                               elog(ERROR, "cache lookup failed for constraint %u",
+                                        fk->conoid);
+                       partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
+                       if (OidIsValid(partConstr->conparentid) ||
+                               !partConstr->convalidated ||
+                               partConstr->condeferrable != constrForm->condeferrable ||
+                               partConstr->condeferred != constrForm->condeferred ||
+                               partConstr->confupdtype != constrForm->confupdtype ||
+                               partConstr->confdeltype != constrForm->confdeltype ||
+                               partConstr->confmatchtype != constrForm->confmatchtype)
+                       {
+                               ReleaseSysCache(partcontup);
+                               attach_it = false;
+                               continue;
+                       }
+
+                       ReleaseSysCache(partcontup);
+
+                       /* looks good!  Attach this constraint */
+                       ConstraintSetParentConstraint(fk->conoid,
+                                                                                 HeapTupleGetOid(tuple));
+                       CommandCounterIncrement();
+                       attach_it = true;
+                       break;
+               }
+
+               /*
+                * If we attached to an existing constraint, there is no need to
+                * create a new one.  In fact, there's no need to recurse for this
+                * constraint to partitions, either.
+                */
+               if (attach_it)
+               {
+                       ReleaseSysCache(tuple);
+                       continue;
+               }
+
                constrOid =
                        CreateConstraintEntry(NameStr(constrForm->conname),
                                                                  constrForm->connamespace,
@@ -547,7 +685,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
                                                                  constrForm->condeferred,
                                                                  constrForm->convalidated,
                                                                  HeapTupleGetOid(tuple),
-                                                                 relationId,
+                                                                 RelationGetRelid(partRel),
                                                                  mapped_conkey,
                                                                  nelem,
                                                                  nelem,
@@ -568,6 +706,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
                                                                  NULL,
                                                                  false,
                                                                  1, false, true);
+               subclone = lappend_oid(subclone, constrOid);
 
                ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
                recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
@@ -580,17 +719,19 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
                fkconstraint->deferrable = constrForm->condeferrable;
                fkconstraint->initdeferred = constrForm->condeferred;
 
-               createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
+               createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
                                                                 constrOid, constrForm->conindid, false);
 
                if (cloned)
                {
+                       ClonedConstraint *newc;
+
                        /*
                         * Feed back caller about the constraints we created, so that they
                         * can set up constraint verification.
                         */
                        newc = palloc(sizeof(ClonedConstraint));
-                       newc->relid = relationId;
+                       newc->relid = RelationGetRelid(partRel);
                        newc->refrelid = constrForm->confrelid;
                        newc->conindid = constrForm->conindid;
                        newc->conid = constrOid;
@@ -598,25 +739,36 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
 
                        *cloned = lappend(*cloned, newc);
                }
+
+               ReleaseSysCache(tuple);
        }
-       systable_endscan(scan);
 
        pfree(attmap);
+       list_free_deep(partFKs);
 
-       if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+       /*
+        * If the partition is partitioned, recurse to handle any constraints that
+        * were cloned.
+        */
+       if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+               subclone != NIL)
        {
-               PartitionDesc partdesc = RelationGetPartitionDesc(rel);
+               PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
                int                     i;
 
                for (i = 0; i < partdesc->nparts; i++)
-                       CloneForeignKeyConstraints(RelationGetRelid(rel),
-                                                                          partdesc->oids[i],
-                                                                          cloned);
+               {
+                       Relation        childRel;
+
+                       childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
+                       clone_fk_constraints(pg_constraint,
+                                                                partRel,
+                                                                childRel,
+                                                                subclone,
+                                                                cloned);
+                       heap_close(childRel, NoLock);   /* keep lock till commit */
+               }
        }
-
-       heap_close(rel, NoLock);        /* keep lock till commit */
-       heap_close(parentRel, NoLock);
-       heap_close(pg_constraint, RowShareLock);
 }
 
 /*
@@ -1028,17 +1180,33 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId)
                elog(ERROR, "cache lookup failed for constraint %u", childConstrId);
        newtup = heap_copytuple(tuple);
        constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
-       constrForm->conislocal = false;
-       constrForm->coninhcount++;
-       constrForm->conparentid = parentConstrId;
-       CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
-       ReleaseSysCache(tuple);
+       if (OidIsValid(parentConstrId))
+       {
+               constrForm->conislocal = false;
+               constrForm->coninhcount++;
+               constrForm->conparentid = parentConstrId;
+
+               CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
 
-       ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
-       ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
+               ObjectAddressSet(referenced, ConstraintRelationId, parentConstrId);
+               ObjectAddressSet(depender, ConstraintRelationId, childConstrId);
 
-       recordDependencyOn(&depender, &referenced, DEPENDENCY_INTERNAL_AUTO);
+               recordDependencyOn(&depender, &referenced, DEPENDENCY_INTERNAL_AUTO);
+       }
+       else
+       {
+               constrForm->coninhcount--;
+               if (constrForm->coninhcount <= 0)
+                       constrForm->conislocal = true;
+               constrForm->conparentid = InvalidOid;
+
+               deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
+                                                                               ConstraintRelationId,
+                                                                               DEPENDENCY_INTERNAL_AUTO);
+               CatalogTupleUpdate(constrRel, &tuple->t_self, newtup);
+       }
 
+       ReleaseSysCache(tuple);
        heap_close(constrRel, RowExclusiveLock);
 }
 
index e10d3dbf3ddfaac09e0fb1d017656c43cc460b3f..3e112b4ef428d966deca31a4fe485bd44521ddb1 100644 (file)
@@ -14091,6 +14091,11 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
 
        attachrel = heap_openrv(cmd->name, AccessExclusiveLock);
 
+       /*
+        * XXX I think it'd be a good idea to grab locks on all tables referenced
+        * by FKs at this point also.
+        */
+
        /*
         * Must be owner of both parent and source table -- parent was checked by
         * ATSimplePermissions call in ATPrepCmd
@@ -14663,6 +14668,7 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
        ObjectAddress address;
        Oid                     defaultPartOid;
        List       *indexes;
+       List       *fks;
        ListCell   *cell;
 
        /*
@@ -14738,6 +14744,23 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
        }
        heap_close(classRel, RowExclusiveLock);
 
+       /* Detach foreign keys */
+       fks = copyObject(RelationGetFKeyList(partRel));
+       foreach(cell, fks)
+       {
+               ForeignKeyCacheInfo *fk = lfirst(cell);
+               HeapTuple       contup;
+
+               contup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid));
+               if (!contup)
+                       elog(ERROR, "cache lookup failed for constraint %u", fk->conoid);
+
+               ConstraintSetParentConstraint(fk->conoid, InvalidOid);
+
+               ReleaseSysCache(contup);
+       }
+       list_free_deep(fks);
+
        /*
         * Invalidate the parent's relcache so that the partition is no longer
         * included in its partition descriptor.
index e47641d572159f612b9ce3656ff5a5c77d28897e..e8ea59e34afd135b1730ebf4b41e399d36c51780 100644 (file)
@@ -4745,6 +4745,7 @@ _copyForeignKeyCacheInfo(const ForeignKeyCacheInfo *from)
 {
        ForeignKeyCacheInfo *newnode = makeNode(ForeignKeyCacheInfo);
 
+       COPY_SCALAR_FIELD(conoid);
        COPY_SCALAR_FIELD(conrelid);
        COPY_SCALAR_FIELD(confrelid);
        COPY_SCALAR_FIELD(nkeys);
index 9f4ffac91e6b215f6fb049f1d84dd82105e7a020..69731ccdea2edf34698dfbdee7e568f722b3cf5b 100644 (file)
@@ -3633,6 +3633,7 @@ _outForeignKeyCacheInfo(StringInfo str, const ForeignKeyCacheInfo *node)
 
        WRITE_NODE_TYPE("FOREIGNKEYCACHEINFO");
 
+       WRITE_OID_FIELD(conoid);
        WRITE_OID_FIELD(conrelid);
        WRITE_OID_FIELD(confrelid);
        WRITE_INT_FIELD(nkeys);
index a4fc0011031ec4dabc7e5f58feb1c26663cf5147..fd3d010b7783219141f7f25979fde7f61cc5a47c 100644 (file)
@@ -4108,8 +4108,9 @@ RelationGetFKeyList(Relation relation)
        if (relation->rd_fkeyvalid)
                return relation->rd_fkeylist;
 
-       /* Fast path: if it doesn't have any triggers, it can't have FKs */
-       if (!relation->rd_rel->relhastriggers)
+       /* Fast path: non-partitioned tables without triggers can't have FKs */
+       if (!relation->rd_rel->relhastriggers &&
+               relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
                return NIL;
 
        /*
@@ -4144,6 +4145,7 @@ RelationGetFKeyList(Relation relation)
                        continue;
 
                info = makeNode(ForeignKeyCacheInfo);
+               info->conoid = HeapTupleGetOid(htup);
                info->conrelid = constraint->conrelid;
                info->confrelid = constraint->confrelid;
 
index 6ecbdb629443848856ecddcf8198fbd297f61d47..84469f57151976bfe15a884abe606868e874265d 100644 (file)
@@ -202,12 +202,13 @@ typedef struct RelationData
  * The per-FK-column arrays can be fixed-size because we allow at most
  * INDEX_MAX_KEYS columns in a foreign key constraint.
  *
- * Currently, we only cache fields of interest to the planner, but the
- * set of fields could be expanded in future.
+ * Currently, we mostly cache fields of interest to the planner, but the set
+ * of fields has already grown the constraint OID for other uses.
  */
 typedef struct ForeignKeyCacheInfo
 {
        NodeTag         type;
+       Oid                     conoid;                 /* oid of the constraint itself */
        Oid                     conrelid;               /* relation constrained by the foreign key */
        Oid                     confrelid;              /* relation referenced by the foreign key */
        int                     nkeys;                  /* number of columns in the foreign key */
index 4e5cb8901e1c57992c268b1d9ff28d43f2c5012c..52164e89d2917eeee9648666aff8be07b8722bed 100644 (file)
@@ -1648,6 +1648,125 @@ SELECT * FROM fk_partitioned_fk WHERE a = 142857;
 
 -- verify that DROP works
 DROP TABLE fk_partitioned_fk_2;
+-- Test behavior of the constraint together with attaching and detaching
+-- partitions.
+CREATE TABLE fk_partitioned_fk_2 PARTITION OF fk_partitioned_fk FOR VALUES IN (1500,1502);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_2;
+BEGIN;
+DROP TABLE fk_partitioned_fk;
+-- constraint should still be there
+\d fk_partitioned_fk_2;
+        Table "public.fk_partitioned_fk_2"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 2501
+ b      | integer |           |          | 142857
+Foreign-key constraints:
+    "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
+ROLLBACK;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1500,1502);
+DROP TABLE fk_partitioned_fk_2;
+CREATE TABLE fk_partitioned_fk_2 (b int, c text, a int,
+       FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk ON UPDATE CASCADE ON DELETE CASCADE);
+ALTER TABLE fk_partitioned_fk_2 DROP COLUMN c;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1500,1502);
+-- should have only one constraint
+\d fk_partitioned_fk_2
+        Table "public.fk_partitioned_fk_2"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ b      | integer |           |          | 
+ a      | integer |           |          | 
+Partition of: fk_partitioned_fk FOR VALUES IN (1500, 1502)
+Foreign-key constraints:
+    "fk_partitioned_fk_2_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
+DROP TABLE fk_partitioned_fk_2;
+CREATE TABLE fk_partitioned_fk_4 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE) PARTITION BY RANGE (b, a);
+CREATE TABLE fk_partitioned_fk_4_1 PARTITION OF fk_partitioned_fk_4 FOR VALUES FROM (1,1) TO (100,100);
+CREATE TABLE fk_partitioned_fk_4_2 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE SET NULL);
+ALTER TABLE fk_partitioned_fk_4 ATTACH PARTITION fk_partitioned_fk_4_2 FOR VALUES FROM (100,100) TO (1000,1000);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN (3500,3502);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_4;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN (3500,3502);
+-- should only have one constraint
+\d fk_partitioned_fk_4
+        Table "public.fk_partitioned_fk_4"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk FOR VALUES IN (3500, 3502)
+Partition key: RANGE (b, a)
+Foreign-key constraints:
+    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+Number of partitions: 2 (Use \d+ to list them.)
+
+\d fk_partitioned_fk_4_1
+       Table "public.fk_partitioned_fk_4_1"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk_4 FOR VALUES FROM (1, 1) TO (100, 100)
+Foreign-key constraints:
+    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
+-- this one has an FK with mismatched properties
+\d fk_partitioned_fk_4_2
+       Table "public.fk_partitioned_fk_4_2"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk_4 FOR VALUES FROM (100, 100) TO (1000, 1000)
+Foreign-key constraints:
+    "fk_partitioned_fk_4_2_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE SET NULL
+    "fk_partitioned_fk_4_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
+CREATE TABLE fk_partitioned_fk_5 (a int, b int,
+       FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE,
+       FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE)
+  PARTITION BY RANGE (a);
+CREATE TABLE fk_partitioned_fk_5_1 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_5 FOR VALUES IN (4500);
+ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUES FROM (0) TO (10);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_5;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_5 FOR VALUES IN (4500);
+-- this one has two constraints, similar but not quite the one in the parent,
+-- so it gets a new one
+\d fk_partitioned_fk_5
+        Table "public.fk_partitioned_fk_5"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk FOR VALUES IN (4500)
+Partition key: RANGE (a)
+Foreign-key constraints:
+    "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE
+    "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE
+    "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+Number of partitions: 1 (Use \d+ to list them.)
+
+-- verify that it works to reattaching a child with multiple candidate
+-- constraints
+ALTER TABLE fk_partitioned_fk_5 DETACH PARTITION fk_partitioned_fk_5_1;
+ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUES FROM (0) TO (10);
+\d fk_partitioned_fk_5_1
+       Table "public.fk_partitioned_fk_5_1"
+ Column |  Type   | Collation | Nullable | Default 
+--------+---------+-----------+----------+---------
+ a      | integer |           |          | 
+ b      | integer |           |          | 
+Partition of: fk_partitioned_fk_5 FOR VALUES FROM (0) TO (10)
+Foreign-key constraints:
+    "fk_partitioned_fk_5_1_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b)
+    "fk_partitioned_fk_5_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE
+    "fk_partitioned_fk_5_a_fkey1" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE
+    "fk_partitioned_fk_a_fkey" FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE
+
 -- verify that attaching a table checks that the existing data satisfies the
 -- constraint
 CREATE TABLE fk_partitioned_fk_2 (a int, b int) PARTITION BY RANGE (b);
index 6fcb5dfb4eb3b001e3fdb2cafc646a47bd38d994..f3870048551f546a908d6ee74fc196b0d36fb868 100644 (file)
@@ -1226,6 +1226,56 @@ SELECT * FROM fk_partitioned_fk WHERE a = 142857;
 -- verify that DROP works
 DROP TABLE fk_partitioned_fk_2;
 
+-- Test behavior of the constraint together with attaching and detaching
+-- partitions.
+CREATE TABLE fk_partitioned_fk_2 PARTITION OF fk_partitioned_fk FOR VALUES IN (1500,1502);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_2;
+BEGIN;
+DROP TABLE fk_partitioned_fk;
+-- constraint should still be there
+\d fk_partitioned_fk_2;
+ROLLBACK;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1500,1502);
+DROP TABLE fk_partitioned_fk_2;
+CREATE TABLE fk_partitioned_fk_2 (b int, c text, a int,
+       FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk ON UPDATE CASCADE ON DELETE CASCADE);
+ALTER TABLE fk_partitioned_fk_2 DROP COLUMN c;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2 FOR VALUES IN (1500,1502);
+-- should have only one constraint
+\d fk_partitioned_fk_2
+DROP TABLE fk_partitioned_fk_2;
+
+CREATE TABLE fk_partitioned_fk_4 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE) PARTITION BY RANGE (b, a);
+CREATE TABLE fk_partitioned_fk_4_1 PARTITION OF fk_partitioned_fk_4 FOR VALUES FROM (1,1) TO (100,100);
+CREATE TABLE fk_partitioned_fk_4_2 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE SET NULL);
+ALTER TABLE fk_partitioned_fk_4 ATTACH PARTITION fk_partitioned_fk_4_2 FOR VALUES FROM (100,100) TO (1000,1000);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN (3500,3502);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_4;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_4 FOR VALUES IN (3500,3502);
+-- should only have one constraint
+\d fk_partitioned_fk_4
+\d fk_partitioned_fk_4_1
+-- this one has an FK with mismatched properties
+\d fk_partitioned_fk_4_2
+
+CREATE TABLE fk_partitioned_fk_5 (a int, b int,
+       FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE,
+       FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk(a, b) MATCH FULL ON UPDATE CASCADE ON DELETE CASCADE)
+  PARTITION BY RANGE (a);
+CREATE TABLE fk_partitioned_fk_5_1 (a int, b int, FOREIGN KEY (a, b) REFERENCES fk_notpartitioned_pk);
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_5 FOR VALUES IN (4500);
+ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUES FROM (0) TO (10);
+ALTER TABLE fk_partitioned_fk DETACH PARTITION fk_partitioned_fk_5;
+ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_5 FOR VALUES IN (4500);
+-- this one has two constraints, similar but not quite the one in the parent,
+-- so it gets a new one
+\d fk_partitioned_fk_5
+-- verify that it works to reattaching a child with multiple candidate
+-- constraints
+ALTER TABLE fk_partitioned_fk_5 DETACH PARTITION fk_partitioned_fk_5_1;
+ALTER TABLE fk_partitioned_fk_5 ATTACH PARTITION fk_partitioned_fk_5_1 FOR VALUES FROM (0) TO (10);
+\d fk_partitioned_fk_5_1
+
 -- verify that attaching a table checks that the existing data satisfies the
 -- constraint
 CREATE TABLE fk_partitioned_fk_2 (a int, b int) PARTITION BY RANGE (b);