Fix TRUNCATE .. CASCADE on partitions
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 7 Feb 2020 20:09:36 +0000 (17:09 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 7 Feb 2020 20:09:36 +0000 (17:09 -0300)
When running TRUNCATE CASCADE on a child of a partitioned table
referenced by another partitioned table, the truncate was not applied to
partitions of the referencing table; this could leave rows violating the
constraint in the referencing partitioned table.  Repair by walking the
pg_constraint chain all the way up to the topmost referencing table.

Note: any partitioned tables containing FKs that reference other
partitioned tables should be checked for possible violating rows, if
TRUNCATE has occurred in partitions of the referenced table.

Reported-by: Christophe Courtois
Author: Jehan-Guillaume de Rorthais
Discussion: https://postgr.es/m/20200204183906.115f693e@firost

doc/src/sgml/ref/truncate.sgml
src/backend/catalog/heap.c
src/test/regress/expected/truncate.out
src/test/regress/sql/truncate.sql

index c1e42376abddb92e5a912462dc9d9606435fff0a..5922ee579e1102a4e91488438a9c04278095625c 100644 (file)
@@ -124,6 +124,9 @@ TRUNCATE [ TABLE ] [ ONLY ] <replaceable class="parameter">name</replaceable> [
    option can be used to automatically include all dependent tables &mdash;
    but be very careful when using this option, or else you might lose data you
    did not intend to!
+   Note in particular that when the table to be truncated is a partition,
+   siblings partitions are left untouched, but cascading occurs to all
+   referencing tables and all their partitions with no distinction.
   </para>
 
   <para>
index 046b3d37cedf910a2e77c84ac10da9afdbe76bd2..e31478bf9177c01b9f626be4934f6d000e7d566a 100644 (file)
@@ -3396,9 +3396,16 @@ List *
 heap_truncate_find_FKs(List *relationIds)
 {
        List       *result = NIL;
+       List       *oids = list_copy(relationIds);
+       List       *parent_cons;
+       ListCell   *cell;
+       ScanKeyData key;
        Relation        fkeyRel;
        SysScanDesc fkeyScan;
        HeapTuple       tuple;
+       bool            restart;
+
+       oids = list_copy(relationIds);
 
        /*
         * Must scan pg_constraint.  Right now, it is a seqscan because there is
@@ -3406,6 +3413,10 @@ heap_truncate_find_FKs(List *relationIds)
         */
        fkeyRel = table_open(ConstraintRelationId, AccessShareLock);
 
+restart:
+       restart = false;
+       parent_cons = NIL;
+
        fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
                                                                  NULL, 0, NULL);
 
@@ -3418,16 +3429,85 @@ heap_truncate_find_FKs(List *relationIds)
                        continue;
 
                /* Not referencing one of our list of tables */
-               if (!list_member_oid(relationIds, con->confrelid))
+               if (!list_member_oid(oids, con->confrelid))
                        continue;
 
-               /* Add referencer to result, unless present in input list */
+               /*
+                * If this constraint has a parent constraint which we have not seen
+                * yet, keep track of it for the second loop, below.  Tracking parent
+                * constraints allows us to climb up to the top-level level constraint
+                * and look for all possible relations referencing the partitioned
+                * table.
+                */
+               if (OidIsValid(con->conparentid) &&
+                       !list_member_oid(parent_cons, con->conparentid))
+                       parent_cons = lappend_oid(parent_cons, con->conparentid);
+
+               /*
+                * Add referencer to result, unless present in input list.  (Don't
+                * worry about dupes: we'll fix that below).
+                */
                if (!list_member_oid(relationIds, con->conrelid))
                        result = lappend_oid(result, con->conrelid);
        }
 
        systable_endscan(fkeyScan);
+
+       /*
+        * Process each parent constraint we found to add the list of referenced
+        * relations by them to the oids list.  If we do add any new such
+        * relations, redo the first loop above.  Also, if we see that the parent
+        * constraint in turn has a parent, add that so that we process all
+        * relations in a single additional pass.
+        */
+       foreach(cell, parent_cons)
+       {
+               Oid             parent = lfirst_oid(cell);
+
+               ScanKeyInit(&key,
+                                       Anum_pg_constraint_oid,
+                                       BTEqualStrategyNumber, F_OIDEQ,
+                                       ObjectIdGetDatum(parent));
+
+               fkeyScan = systable_beginscan(fkeyRel, ConstraintOidIndexId,
+                                                                         true, NULL, 1, &key);
+
+               tuple = systable_getnext(fkeyScan);
+               if (HeapTupleIsValid(tuple))
+               {
+                       Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
+
+                       /*
+                        * pg_constraint rows always appear for partitioned hierarchies
+                        * this way: on the each side of the constraint, one row appears
+                        * for each partition that points to the top-most table on the
+                        * other side.
+                        *
+                        * Because of this arrangement, we can correctly catch all
+                        * relevant relations by adding to 'parent_cons' all rows with
+                        * valid conparentid, and to the 'oids' list all rows with a
+                        * zero conparentid.  If any oids are added to 'oids', redo the
+                        * first loop above by setting 'restart'.
+                        */
+                       if (OidIsValid(con->conparentid))
+                               parent_cons = list_append_unique_oid(parent_cons,
+                                                                                                        con->conparentid);
+                       else if (!list_member_oid(oids, con->confrelid))
+                       {
+                               oids = lappend_oid(oids, con->confrelid);
+                               restart = true;
+                       }
+               }
+
+               systable_endscan(fkeyScan);
+       }
+
+       list_free(parent_cons);
+       if (restart)
+               goto restart;
+
        table_close(fkeyRel, AccessShareLock);
+       list_free(oids);
 
        /* Now sort and de-duplicate the result list */
        list_sort(result, list_oid_cmp);
index cc68274dca5c0046011bd7a80c87594c6909e2ab..1e88e867bf2296cd58762a770834100805b59a55 100644 (file)
@@ -542,3 +542,53 @@ SELECT * FROM tp_chk_data();
 
 DROP TABLE truncprim, truncpart;
 DROP FUNCTION tp_ins_data(), tp_chk_data();
+-- test cascade when referencing a partitioned table
+CREATE TABLE trunc_a (a INT PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE trunc_a1 PARTITION OF trunc_a FOR VALUES FROM (0) TO (10);
+CREATE TABLE trunc_a2 PARTITION OF trunc_a FOR VALUES FROM (10) TO (20)
+  PARTITION BY RANGE (a);
+CREATE TABLE trunc_a21 PARTITION OF trunc_a2 FOR VALUES FROM (10) TO (12);
+CREATE TABLE trunc_a22 PARTITION OF trunc_a2 FOR VALUES FROM (12) TO (16);
+CREATE TABLE trunc_a2d PARTITION OF trunc_a2 DEFAULT;
+CREATE TABLE trunc_a3 PARTITION OF trunc_a FOR VALUES FROM (20) TO (30);
+INSERT INTO trunc_a VALUES (0), (5), (10), (15), (20), (25);
+-- truncate a partition cascading to a table
+CREATE TABLE ref_b (
+    b INT PRIMARY KEY,
+    a INT REFERENCES trunc_a(a) ON DELETE CASCADE
+);
+INSERT INTO ref_b VALUES (10, 0), (50, 5), (100, 10), (150, 15);
+TRUNCATE TABLE trunc_a1 CASCADE;
+NOTICE:  truncate cascades to table "ref_b"
+SELECT a FROM ref_b;
+ a 
+---
+(0 rows)
+
+DROP TABLE ref_b;
+-- truncate a partition cascading to a partitioned table
+CREATE TABLE ref_c (
+    c INT PRIMARY KEY,
+    a INT REFERENCES trunc_a(a) ON DELETE CASCADE
+) PARTITION BY RANGE (c);
+CREATE TABLE ref_c1 PARTITION OF ref_c FOR VALUES FROM (100) TO (200);
+CREATE TABLE ref_c2 PARTITION OF ref_c FOR VALUES FROM (200) TO (300);
+INSERT INTO ref_c VALUES (100, 10), (150, 15), (200, 20), (250, 25);
+TRUNCATE TABLE trunc_a21 CASCADE;
+NOTICE:  truncate cascades to table "ref_c"
+NOTICE:  truncate cascades to table "ref_c1"
+NOTICE:  truncate cascades to table "ref_c2"
+SELECT a as "from table ref_c" FROM ref_c;
+ from table ref_c 
+------------------
+(0 rows)
+
+SELECT a as "from table trunc_a" FROM trunc_a ORDER BY a;
+ from table trunc_a 
+--------------------
+                 15
+                 20
+                 25
+(3 rows)
+
+DROP TABLE trunc_a, ref_c;
index 28395e82bf8578bea3902b90f1e7d668d2565e79..54f26e3077a045046bcf2b46ee037c8e23f682ee 100644 (file)
@@ -289,3 +289,41 @@ TRUNCATE TABLE truncpart;
 SELECT * FROM tp_chk_data();
 DROP TABLE truncprim, truncpart;
 DROP FUNCTION tp_ins_data(), tp_chk_data();
+
+-- test cascade when referencing a partitioned table
+CREATE TABLE trunc_a (a INT PRIMARY KEY) PARTITION BY RANGE (a);
+CREATE TABLE trunc_a1 PARTITION OF trunc_a FOR VALUES FROM (0) TO (10);
+CREATE TABLE trunc_a2 PARTITION OF trunc_a FOR VALUES FROM (10) TO (20)
+  PARTITION BY RANGE (a);
+CREATE TABLE trunc_a21 PARTITION OF trunc_a2 FOR VALUES FROM (10) TO (12);
+CREATE TABLE trunc_a22 PARTITION OF trunc_a2 FOR VALUES FROM (12) TO (16);
+CREATE TABLE trunc_a2d PARTITION OF trunc_a2 DEFAULT;
+CREATE TABLE trunc_a3 PARTITION OF trunc_a FOR VALUES FROM (20) TO (30);
+INSERT INTO trunc_a VALUES (0), (5), (10), (15), (20), (25);
+
+-- truncate a partition cascading to a table
+CREATE TABLE ref_b (
+    b INT PRIMARY KEY,
+    a INT REFERENCES trunc_a(a) ON DELETE CASCADE
+);
+INSERT INTO ref_b VALUES (10, 0), (50, 5), (100, 10), (150, 15);
+
+TRUNCATE TABLE trunc_a1 CASCADE;
+SELECT a FROM ref_b;
+
+DROP TABLE ref_b;
+
+-- truncate a partition cascading to a partitioned table
+CREATE TABLE ref_c (
+    c INT PRIMARY KEY,
+    a INT REFERENCES trunc_a(a) ON DELETE CASCADE
+) PARTITION BY RANGE (c);
+CREATE TABLE ref_c1 PARTITION OF ref_c FOR VALUES FROM (100) TO (200);
+CREATE TABLE ref_c2 PARTITION OF ref_c FOR VALUES FROM (200) TO (300);
+INSERT INTO ref_c VALUES (100, 10), (150, 15), (200, 20), (250, 25);
+
+TRUNCATE TABLE trunc_a21 CASCADE;
+SELECT a as "from table ref_c" FROM ref_c;
+SELECT a as "from table trunc_a" FROM trunc_a ORDER BY a;
+
+DROP TABLE trunc_a, ref_c;