Support adding partitioned tables to publication
authorPeter Eisentraut <peter@eisentraut.org>
Tue, 10 Mar 2020 07:42:59 +0000 (08:42 +0100)
committerPeter Eisentraut <peter@eisentraut.org>
Tue, 10 Mar 2020 08:09:32 +0000 (09:09 +0100)
When a partitioned table is added to a publication, changes of all of
its partitions (current or future) are published via that publication.

This change only affects which tables a publication considers as its
members.  The receiving side still sees the data coming from the
individual leaf partitions.  So existing restrictions that partition
hierarchies can only be replicated one-to-one are not changed by this.

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com

doc/src/sgml/logical-replication.sgml
doc/src/sgml/ref/create_publication.sgml
src/backend/catalog/pg_publication.c
src/backend/commands/publicationcmds.c
src/backend/replication/logical/tablesync.c
src/backend/replication/pgoutput/pgoutput.c
src/bin/pg_dump/pg_dump.c
src/include/catalog/pg_publication.h
src/test/regress/expected/publication.out
src/test/regress/sql/publication.sql
src/test/subscription/t/013_partition.pl [new file with mode: 0644]

index f657d1d06e0049ab1fba1739f8e74d4388d1474c..8bd7c9c8ac0941c8fb4290c3c90b32d53d451043 100644 (file)
 
    <listitem>
     <para>
-     Replication is only possible from base tables to base tables.  That is,
-     the tables on the publication and on the subscription side must be normal
-     tables, not views, materialized views, partition root tables, or foreign
-     tables.  In the case of partitions, you can therefore replicate a
-     partition hierarchy one-to-one, but you cannot currently replicate to a
-     differently partitioned setup.  Attempts to replicate tables other than
-     base tables will result in an error.
+     Replication is only supported by tables, partitioned or not, although a
+     given table must either be partitioned on both servers or not partitioned
+     at all.  Also, when replicating between partitioned tables, the actual
+     replication occurs between leaf partitions, so partitions on the two
+     servers must match one-to-one.
+    </para>
+
+    <para>
+     Attempts to replicate other types of relations such as views, materialized
+     views, or foreign tables, will result in an error.
     </para>
    </listitem>
   </itemizedlist>
index 99f87ca39386949f39c4412820859d7470877116..597cb28f3397dce294e13c05ba47512aea9f85fc 100644 (file)
@@ -69,14 +69,23 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       specified, the table and all its descendant tables (if any) are added.
       Optionally, <literal>*</literal> can be specified after the table name to
       explicitly indicate that descendant tables are included.
+      This does not apply to a partitioned table, however.  The partitions of
+      a partitioned table are always implicitly considered part of the
+      publication, so they are never explicitly added to the publication.
      </para>
 
      <para>
-      Only persistent base tables can be part of a publication.  Temporary
-      tables, unlogged tables, foreign tables, materialized views, regular
-      views, and partitioned tables cannot be part of a publication.  To
-      replicate a partitioned table, add the individual partitions to the
-      publication.
+      Only persistent base tables and partitioned tables can be part of a
+      publication.  Temporary tables, unlogged tables, foreign tables,
+      materialized views, and regular views cannot be part of a publication.
+     </para>
+
+     <para>
+      When a partitioned table is added to a publication, all of its existing
+      and future partitions are implicitly considered to be part of the
+      publication.  So, even operations that are performed directly on a
+      partition are also published via publications that its ancestors are
+      part of.
      </para>
     </listitem>
    </varlistentry>
index c5eea7af3fb9577badd5d547fb5dc37dee90e18f..500a5ae1ee0b9c453a5f5c4e7527ae4fccb08860 100644 (file)
 #include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
+#include "catalog/partition.h"
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
+#include "catalog/pg_inherits.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
 #include "catalog/pg_type.h"
@@ -40,6 +42,8 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
+static List *get_rel_publications(Oid relid);
+
 /*
  * Check if relation can be in given publication and throws appropriate
  * error if not.
 static void
 check_publication_add_relation(Relation targetrel)
 {
-       /* Give more specific error for partitioned tables */
-       if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE)
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("\"%s\" is a partitioned table",
-                                               RelationGetRelationName(targetrel)),
-                                errdetail("Adding partitioned tables to publications is not supported."),
-                                errhint("You can add the table partitions individually.")));
-
-       /* Must be table */
-       if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION)
+       /* Must be a regular or partitioned table */
+       if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION &&
+               RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE)
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                 errmsg("\"%s\" is not a table",
@@ -103,7 +99,8 @@ check_publication_add_relation(Relation targetrel)
 static bool
 is_publishable_class(Oid relid, Form_pg_class reltuple)
 {
-       return reltuple->relkind == RELKIND_RELATION &&
+       return (reltuple->relkind == RELKIND_RELATION ||
+                       reltuple->relkind == RELKIND_PARTITIONED_TABLE) &&
                !IsCatalogRelationOid(relid) &&
                reltuple->relpersistence == RELPERSISTENCE_PERMANENT &&
                relid >= FirstNormalObjectId;
@@ -221,10 +218,35 @@ publication_add_relation(Oid pubid, Relation targetrel,
 
 
 /*
- * Gets list of publication oids for a relation oid.
+ * Gets list of publication oids for a relation, plus those of ancestors,
+ * if any, if the relation is a partition.
  */
 List *
 GetRelationPublications(Oid relid)
+{
+       List       *result = NIL;
+
+       result = get_rel_publications(relid);
+       if (get_rel_relispartition(relid))
+       {
+               List       *ancestors = get_partition_ancestors(relid);
+               ListCell   *lc;
+
+               foreach(lc, ancestors)
+               {
+                       Oid                     ancestor = lfirst_oid(lc);
+                       List       *ancestor_pubs = get_rel_publications(ancestor);
+
+                       result = list_concat(result, ancestor_pubs);
+               }
+       }
+
+       return result;
+}
+
+/* Workhorse of GetRelationPublications() */
+static List *
+get_rel_publications(Oid relid)
 {
        List       *result = NIL;
        CatCList   *pubrellist;
@@ -253,7 +275,7 @@ GetRelationPublications(Oid relid)
  * should use GetAllTablesPublicationRelations().
  */
 List *
-GetPublicationRelations(Oid pubid)
+GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 {
        List       *result;
        Relation        pubrelsrel;
@@ -279,7 +301,31 @@ GetPublicationRelations(Oid pubid)
 
                pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
 
-               result = lappend_oid(result, pubrel->prrelid);
+               if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE &&
+                       pub_partopt != PUBLICATION_PART_ROOT)
+               {
+                       List       *all_parts = find_all_inheritors(pubrel->prrelid, NoLock,
+                                                                                                               NULL);
+
+                       if (pub_partopt == PUBLICATION_PART_ALL)
+                               result = list_concat(result, all_parts);
+                       else if (pub_partopt == PUBLICATION_PART_LEAF)
+                       {
+                               ListCell   *lc;
+
+                               foreach(lc, all_parts)
+                               {
+                                       Oid                     partOid = lfirst_oid(lc);
+
+                                       if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE)
+                                               result = lappend_oid(result, partOid);
+                               }
+                       }
+                       else
+                               Assert(false);
+               }
+               else
+                       result = lappend_oid(result, pubrel->prrelid);
        }
 
        systable_endscan(scan);
@@ -480,10 +526,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
                oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
 
                publication = GetPublicationByName(pubname, false);
+
+               /*
+                * Publications support partitioned tables, although all changes are
+                * replicated using leaf partition identity and schema, so we only
+                * need those.
+                */
                if (publication->alltables)
                        tables = GetAllTablesPublicationRelations();
                else
-                       tables = GetPublicationRelations(publication->oid);
+                       tables = GetPublicationRelations(publication->oid,
+                                                                                        PUBLICATION_PART_LEAF);
                funcctx->user_fctx = (void *) tables;
 
                MemoryContextSwitchTo(oldcontext);
index eb4d22cc2a8987179fce11965661ce95b135c3d3..768c2184e13e25707e9edcfa2bb688ec210c423b 100644 (file)
@@ -299,7 +299,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
        }
        else
        {
-               List       *relids = GetPublicationRelations(pubform->oid);
+               /*
+                * For any partitioned tables contained in the publication, we must
+                * invalidate all partitions contained in the respective partition
+                * trees, not just those explicitly mentioned in the publication.
+                */
+               List   *relids = GetPublicationRelations(pubform->oid,
+                                                                                                PUBLICATION_PART_ALL);
 
                /*
                 * We don't want to send too many individual messages, at some point
@@ -356,7 +362,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
                PublicationDropTables(pubid, rels, false);
        else                                            /* DEFELEM_SET */
        {
-               List       *oldrelids = GetPublicationRelations(pubid);
+               List   *oldrelids = GetPublicationRelations(pubid,
+                                                                                                       PUBLICATION_PART_ROOT);
                List       *delrels = NIL;
                ListCell   *oldlc;
 
@@ -498,7 +505,8 @@ RemovePublicationRelById(Oid proid)
 
 /*
  * Open relations specified by a RangeVar list.
- * The returned tables are locked in ShareUpdateExclusiveLock mode.
+ * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
+ * add them to a publication.
  */
 static List *
 OpenTableList(List *tables)
@@ -539,8 +547,13 @@ OpenTableList(List *tables)
                rels = lappend(rels, rel);
                relids = lappend_oid(relids, myrelid);
 
-               /* Add children of this rel, if requested */
-               if (recurse)
+               /*
+                * Add children of this rel, if requested, so that they too are added
+                * to the publication.  A partitioned table can't have any inheritance
+                * children other than its partitions, which need not be explicitly
+                * added to the publication.
+                */
+               if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
                {
                        List       *children;
                        ListCell   *child;
index f8183cd488c985cfea99fbd999c27fb5baeb453b..98825f01e98663d377662bbde7fa8db868ae0376 100644 (file)
@@ -761,6 +761,7 @@ copy_table(Relation rel)
        /* Map the publisher relation to local one. */
        relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
        Assert(rel == relmapentry->localrel);
+       Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION);
 
        /* Start copy on the publisher. */
        initStringInfo(&cmd);
index 752508213af1d1774a96dc93187f1b459b25e566..552a70cffa5590261d2366a2f3b3218fd9a241e2 100644 (file)
@@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
                                                                                uint32 hashvalue);
 
-/* Entry in the map used to remember which relation schemas we sent. */
+/*
+ * Entry in the map used to remember which relation schemas we sent.
+ *
+ * For partitions, 'pubactions' considers not only the table's own
+ * publications, but also those of all of its ancestors.
+ */
 typedef struct RelationSyncEntry
 {
        Oid                     relid;                  /* relation oid */
@@ -406,6 +411,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                if (!relentry->pubactions.pubtruncate)
                        continue;
 
+               /*
+                * Don't send partitioned tables, because partitions should be sent
+                * instead.
+                */
+               if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+                       continue;
+
                relids[nrelids++] = relid;
                maybe_send_schema(ctx, relation, relentry);
        }
@@ -524,6 +536,11 @@ init_rel_sync_cache(MemoryContext cachectx)
 
 /*
  * Find or create entry in the relation schema cache.
+ *
+ * This looks up publications that the given relation is directly or
+ * indirectly part of (the latter if it's really the relation's ancestor that
+ * is part of a publication) and fills up the found entry with the information
+ * about which operations to publish.
  */
 static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Oid relid)
index a12c8d011bcc02b410c65bb617e8b97f04891ab0..ad039e97a5b70613c2c29534fe3012406348b472 100644 (file)
@@ -3981,8 +3981,12 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
        {
                TableInfo  *tbinfo = &tblinfo[i];
 
-               /* Only plain tables can be aded to publications. */
-               if (tbinfo->relkind != RELKIND_RELATION)
+               /*
+                * Only regular and partitioned tables can be added to
+                * publications.
+                */
+               if (tbinfo->relkind != RELKIND_RELATION &&
+                       tbinfo->relkind != RELKIND_PARTITIONED_TABLE)
                        continue;
 
                /*
index 6cdc2b1197075350b522b0b1eb81a780e488060f..bb52e8c5e0853262c15cd085900210225d90bfc8 100644 (file)
@@ -80,7 +80,24 @@ typedef struct Publication
 extern Publication *GetPublication(Oid pubid);
 extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
 extern List *GetRelationPublications(Oid relid);
-extern List *GetPublicationRelations(Oid pubid);
+
+/*---------
+ * Expected values for pub_partopt parameter of GetRelationPublications(),
+ * which allows callers to specify which partitions of partitioned tables
+ * mentioned in the publication they expect to see.
+ *
+ *     ROOT:   only the table explicitly mentioned in the publication
+ *     LEAF:   only leaf partitions in given tree
+ *     ALL:    all partitions in given tree
+ */
+typedef enum PublicationPartOpt
+{
+       PUBLICATION_PART_ROOT,
+       PUBLICATION_PART_LEAF,
+       PUBLICATION_PART_ALL,
+}                      PublicationPartOpt;
+
+extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
 extern List *GetAllTablesPublicationRelations(void);
 
index feb51e4add7f592154a9eb9467858f9857a822a8..2634d2c1e142bf5affb4d11ca413456b4f62b361 100644 (file)
@@ -116,6 +116,35 @@ Tables:
 
 DROP TABLE testpub_tbl3, testpub_tbl3a;
 DROP PUBLICATION testpub3, testpub4;
+-- Tests for partitioned tables
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_forparted;
+CREATE PUBLICATION testpub_forparted1;
+RESET client_min_messages;
+CREATE TABLE testpub_parted1 (LIKE testpub_parted);
+ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
+-- works despite missing REPLICA IDENTITY, because updates are not replicated
+UPDATE testpub_parted1 SET a = 1;
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+-- only parent is listed as being in publication, not the partition
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
+\dRp+ testpub_forparted
+                          Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
+--------------------------+------------+---------+---------+---------+-----------
+ regress_publication_user | f          | t       | t       | t       | t
+Tables:
+    "public.testpub_parted"
+
+-- should now fail, because parent's publication replicates updates
+UPDATE testpub_parted1 SET a = 1;
+ERROR:  cannot update table "testpub_parted1" because it does not have a replica identity and publishes updates
+HINT:  To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
+ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
+-- works again, because parent's publication is no longer considered
+UPDATE testpub_parted1 SET a = 1;
+DROP TABLE testpub_parted1;
+DROP PUBLICATION testpub_forparted, testpub_forparted1;
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
 ERROR:  "testpub_view" is not a table
@@ -142,11 +171,6 @@ Tables:
 ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
 ERROR:  "testpub_view" is not a table
 DETAIL:  Only tables can be added to publications.
--- fail - partitioned table
-ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted;
-ERROR:  "testpub_parted" is a partitioned table
-DETAIL:  Adding partitioned tables to publications is not supported.
-HINT:  You can add the table partitions individually.
 ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk;
index 5773a755cf3a8e8cbf8a4bd041cd09873e702fe0..219e04129d2d9e8508c6888a9ba6ae9dfa88c5dd 100644 (file)
@@ -69,6 +69,27 @@ RESET client_min_messages;
 DROP TABLE testpub_tbl3, testpub_tbl3a;
 DROP PUBLICATION testpub3, testpub4;
 
+-- Tests for partitioned tables
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_forparted;
+CREATE PUBLICATION testpub_forparted1;
+RESET client_min_messages;
+CREATE TABLE testpub_parted1 (LIKE testpub_parted);
+ALTER PUBLICATION testpub_forparted1 SET (publish='insert');
+-- works despite missing REPLICA IDENTITY, because updates are not replicated
+UPDATE testpub_parted1 SET a = 1;
+ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
+-- only parent is listed as being in publication, not the partition
+ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
+\dRp+ testpub_forparted
+-- should now fail, because parent's publication replicates updates
+UPDATE testpub_parted1 SET a = 1;
+ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
+-- works again, because parent's publication is no longer considered
+UPDATE testpub_parted1 SET a = 1;
+DROP TABLE testpub_parted1;
+DROP PUBLICATION testpub_forparted, testpub_forparted1;
+
 -- fail - view
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view;
 SET client_min_messages = 'ERROR';
@@ -83,8 +104,6 @@ CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 
 -- fail - view
 ALTER PUBLICATION testpub_default ADD TABLE testpub_view;
--- fail - partitioned table
-ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted;
 
 ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1;
 ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1;
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
new file mode 100644 (file)
index 0000000..ea5812c
--- /dev/null
@@ -0,0 +1,178 @@
+# Test logical replication with partitioned tables
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 15;
+
+# setup
+
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+my $node_subscriber1 = get_new_node('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $node_subscriber2 = get_new_node('subscriber2');
+$node_subscriber2->init(allows_streaming => 'logical');
+$node_subscriber2->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+
+# publisher
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub1");
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub_all FOR ALL TABLES");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1_1 (b text, a int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+       "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)");
+$node_publisher->safe_psql('postgres',
+       "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1");
+
+# subscriber1
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)");
+$node_subscriber1->safe_psql('postgres',
+       "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1");
+
+# subscriber 2
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all");
+
+# Wait for initial sync of all subscriptions
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1 VALUES (1)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1_1 (a) VALUES (3)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1_2 VALUES (5)");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+my $result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
+is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
+is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated');
+
+# update (no partition change)
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 2 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
+is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated');
+
+# update (partition changes)
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 6 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
+is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
+is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
+is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated');
+
+# delete
+$node_publisher->safe_psql('postgres',
+       "DELETE FROM tab1 WHERE a IN (3, 5)");
+$node_publisher->safe_psql('postgres',
+       "DELETE FROM tab1_2");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1_1");
+is($result, qq(0||), 'delete from tab1_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1_2");
+is($result, qq(0||), 'delete from tab1_2 replicated');
+
+# truncate
+$node_subscriber1->safe_psql('postgres',
+       "INSERT INTO tab1 VALUES (1), (2), (5)");
+$node_subscriber2->safe_psql('postgres',
+       "INSERT INTO tab1_2 VALUES (2)");
+$node_publisher->safe_psql('postgres',
+       "TRUNCATE tab1_2");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(2|1|2), 'truncate of tab1_2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1_2");
+is($result, qq(0||), 'truncate of tab1_2 replicated');
+
+$node_publisher->safe_psql('postgres',
+       "TRUNCATE tab1");
+
+$node_publisher->wait_for_catchup('sub1');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'truncate of tab1_1 replicated');
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT count(*), min(a), max(a) FROM tab1");
+is($result, qq(0||), 'truncate of tab1_1 replicated');