Fix row filters with multiple publications
authorTomas Vondra <tomas.vondra@postgresql.org>
Thu, 17 Mar 2022 16:03:45 +0000 (17:03 +0100)
committerTomas Vondra <tomas.vondra@postgresql.org>
Thu, 17 Mar 2022 16:03:48 +0000 (17:03 +0100)
When publishing changes through a artition root, we should use the row
filter for the top-most ancestor. The relation may be added to multiple
publications, using different ancestors, and 52e4f0cd47 handled this
incorrectly. With c91f71b9dc we find the correct top-most ancestor, but
the code tried to fetch the row filter from all publications, including
those using a different ancestor etc. No row filter can be found for
such publications, which was treated as replicating all rows.

Similarly to c91f71b9dc, this seems to be a rare issue in practice. It
requires multiple publications including the same partitioned relation,
through different ancestors.

Fixed by only passing publications containing the top-most ancestor to
pgoutput_row_filter_init(), so that treating a missing row filter as
replicating all rows is correct.

Report and fix by me, test case by Hou zj. Reviews and improvements by
Amit Kapila.

Author: Tomas Vondra, Hou zj, Amit Kapila
Reviewed-by: Amit Kapila, Hou zj
Discussion: https://postgr.es/m/d26d24dd-2fab-3c48-0162-2b7f84a9c893%40enterprisedb.com

src/backend/replication/pgoutput/pgoutput.c
src/test/subscription/t/028_row_filter.pl

index d869f3e93ebf0d2bd666ce45914811cf4038ac70..5fddab3a3d4aa39e6c6b786d686f41af923ade3d 100644 (file)
@@ -1890,8 +1890,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
                entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
                entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 
-               rel_publications = lappend(rel_publications, pub);
-
                /*
                 * We want to publish the changes as the top-most ancestor
                 * across all publications. So we need to check if the
@@ -1902,9 +1900,27 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
                if (publish_ancestor_level > ancestor_level)
                    continue;
 
-               /* The new value is an ancestor, so let's keep it. */
-               publish_as_relid = pub_relid;
-               publish_ancestor_level = ancestor_level;
+               /*
+                * If we found an ancestor higher up in the tree, discard
+                * the list of publications through which we replicate it,
+                * and use the new ancestor.
+                */
+               if (publish_ancestor_level < ancestor_level)
+               {
+                   publish_as_relid = pub_relid;
+                   publish_ancestor_level = ancestor_level;
+
+                   /* reset the publication list for this relation */
+                   rel_publications = NIL;
+               }
+               else
+               {
+                   /* Same ancestor level, has to be the same OID. */
+                   Assert(publish_as_relid == pub_relid);
+               }
+
+               /* Track publications for this ancestor. */
+               rel_publications = lappend(rel_publications, pub);
            }
        }
 
index 89bb364e9da9a8c1342f109394f621c967f05462..82c4eb6ef627c969481a6a05af1a8d271f83de78 100644 (file)
@@ -237,6 +237,11 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->safe_psql('postgres',
    "CREATE TABLE tab_rowfilter_child (b text) INHERITS (tab_rowfilter_inherited)"
 );
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab_rowfilter_viaroot_part (a int) PARTITION BY RANGE (a)");
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab_rowfilter_viaroot_part_1 PARTITION OF tab_rowfilter_viaroot_part FOR VALUES FROM (1) TO (20)"
+);
 
 # setup structure on subscriber
 $node_subscriber->safe_psql('postgres',
@@ -283,6 +288,11 @@ $node_subscriber->safe_psql('postgres',
 $node_subscriber->safe_psql('postgres',
    "CREATE TABLE tab_rowfilter_child (b text) INHERITS (tab_rowfilter_inherited)"
 );
+$node_subscriber->safe_psql('postgres',
+   "CREATE TABLE tab_rowfilter_viaroot_part (a int)");
+$node_subscriber->safe_psql('postgres',
+   "CREATE TABLE tab_rowfilter_viaroot_part_1 (a int)"
+);
 
 # setup logical replication
 $node_publisher->safe_psql('postgres',
@@ -330,6 +340,15 @@ $node_publisher->safe_psql('postgres',
    "CREATE PUBLICATION tap_pub_inherits FOR TABLE tab_rowfilter_inherited WHERE (a > 15)"
 );
 
+# two publications, each publishing the partition through a different ancestor, with
+# different row filters
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION tap_pub_viaroot_1 FOR TABLE tab_rowfilter_viaroot_part WHERE (a > 15) WITH (publish_via_partition_root)"
+);
+$node_publisher->safe_psql('postgres',
+   "CREATE PUBLICATION tap_pub_viaroot_2 FOR TABLE tab_rowfilter_viaroot_part_1 WHERE (a < 15) WITH (publish_via_partition_root)"
+);
+
 #
 # The following INSERTs are executed before the CREATE SUBSCRIPTION, so these
 # SQL commands are for testing the initial data copy using logical replication.
@@ -376,7 +395,7 @@ $node_publisher->safe_psql('postgres',
 );
 
 $node_subscriber->safe_psql('postgres',
-   "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits"
+   "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1"
 );
 
 $node_publisher->wait_for_catchup($appname);
@@ -534,6 +553,8 @@ $node_publisher->safe_psql('postgres',
    "INSERT INTO tab_rowfilter_inherited (a) VALUES (14), (16)");
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab_rowfilter_child (a, b) VALUES (13, '13'), (17, '17')");
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO tab_rowfilter_viaroot_part (a) VALUES (14), (15), (16)");
 
 $node_publisher->wait_for_catchup($appname);
 
@@ -688,6 +709,30 @@ $result =
    "SELECT a = repeat('1234567890', 200), b FROM tab_rowfilter_toast");
 is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast');
 
+# Check expected replicated rows for tab_rowfilter_viaroot_part and
+# tab_rowfilter_viaroot_part_1. We should replicate only rows matching
+# the row filter for the top-level ancestor:
+#
+# tab_rowfilter_viaroot_part filter is: (a > 15)
+# - INSERT (14)        NO, 14 < 15
+# - INSERT (15)        NO, 15 = 15
+# - INSERT (16)        YES, 16 > 15
+$result =
+  $node_subscriber->safe_psql('postgres',
+   "SELECT a FROM tab_rowfilter_viaroot_part");
+is( $result, qq(16),
+   'check replicated rows to tab_rowfilter_viaroot_part'
+);
+
+# Check there is no data in tab_rowfilter_viaroot_part_1 because rows are
+# replicated via the top most parent table tab_rowfilter_viaroot_part
+$result =
+  $node_subscriber->safe_psql('postgres',
+   "SELECT a FROM tab_rowfilter_viaroot_part_1");
+is( $result, qq(),
+   'check replicated rows to tab_rowfilter_viaroot_part_1'
+);
+
 # Testcase end: FOR TABLE with row filter publications
 # ======================================================