Allow publishing partition changes via ancestors
authorPeter Eisentraut <peter@eisentraut.org>
Wed, 8 Apr 2020 07:59:27 +0000 (09:59 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Wed, 8 Apr 2020 09:19:23 +0000 (11:19 +0200)
To control whether partition changes are replicated using their own
identity and schema or an ancestor's, add a new parameter that can be
set per publication named 'publish_via_partition_root'.

This allows replicating a partitioned table into a different partition
structure on the subscriber.

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com

15 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/logical-replication.sgml
doc/src/sgml/ref/create_publication.sgml
src/backend/catalog/pg_publication.c
src/backend/commands/publicationcmds.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/utils/cache/relcache.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/include/catalog/catversion.h
src/include/catalog/pg_publication.h
src/test/regress/expected/publication.out
src/test/regress/sql/publication.sql
src/test/subscription/t/013_partition.pl

index 0d61d98b1153ea9772634e8b459706f072996896..386c6d7bd1b65b287d6c5d5036422c3ae9604cac 100644 (file)
@@ -5437,6 +5437,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>If true, <command>TRUNCATE</command> operations are replicated for
        tables in the publication.</entry>
      </row>
+
+     <row>
+      <entry><structfield>pubviaroot</structfield></entry>
+      <entry><type>bool</type></entry>
+      <entry></entry>
+      <entry>If true, operations on a leaf partition are replicated using the
+       identity and schema of its topmost partitioned ancestor mentioned in the
+       publication instead of its own.
+      </entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index c513621470afce8149d3bb056eca962ccb141efa..eba331a72b584cdce79e8e2a87c0a844cbc09aa6 100644 (file)
    <listitem>
     <para>
      When replicating between partitioned tables, the actual replication
-     originates from the leaf partitions on the publisher, so partitions on
-     the publisher must also exist on the subscriber as valid target tables.
-     (They could either be leaf partitions themselves, or they could be
-     further subpartitioned, or they could even be independent tables.)
+     originates, by default, from the leaf partitions on the publisher, so
+     partitions on the publisher must also exist on the subscriber as valid
+     target tables. (They could either be leaf partitions themselves, or they
+     could be further subpartitioned, or they could even be independent
+     tables.)  Publications can also specify that changes are to be replicated
+     using the identity and schema of the partitioned root table instead of
+     that of the individual leaf partitions in which the changes actually
+     originate (see <xref linkend="sql-createpublication"/>).
     </para>
    </listitem>
   </itemizedlist>
index 597cb28f3397dce294e13c05ba47512aea9f85fc..2c52a8aada1725b13166360bf0e966489ea893ec 100644 (file)
@@ -123,6 +123,26 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
          </para>
         </listitem>
        </varlistentry>
+
+       <varlistentry>
+        <term><literal>publish_via_partition_root</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          This parameter determines whether changes in a partitioned table (or
+          on its partitions) contained in the publication will be published
+          using the identity and schema of the partitioned table rather than
+          that of the individual partitions that are actually changed; the
+          latter is the default.  Enablings this allows the changes to be
+          replicated into a non-partitioned table or a partitioned table
+          consisting of a different set of partitions.
+         </para>
+
+         <para>
+          If this is enabled, <literal>TRUNCATE</literal> operations performed
+          directly on partitions are not replicated.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
 
      </para>
index 500a5ae1ee0b9c453a5f5c4e7527ae4fccb08860..68f6887b383a175bfb1706891c03e5c12d3e2675 100644 (file)
@@ -42,8 +42,6 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
-static List *get_rel_publications(Oid relid);
-
 /*
  * Check if relation can be in given publication and throws appropriate
  * error if not.
@@ -216,37 +214,9 @@ publication_add_relation(Oid pubid, Relation targetrel,
        return myself;
 }
 
-
-/*
- * Gets list of publication oids for a relation, plus those of ancestors,
- * if any, if the relation is a partition.
- */
+/* Gets list of publication oids for a relation */
 List *
 GetRelationPublications(Oid relid)
-{
-       List       *result = NIL;
-
-       result = get_rel_publications(relid);
-       if (get_rel_relispartition(relid))
-       {
-               List       *ancestors = get_partition_ancestors(relid);
-               ListCell   *lc;
-
-               foreach(lc, ancestors)
-               {
-                       Oid                     ancestor = lfirst_oid(lc);
-                       List       *ancestor_pubs = get_rel_publications(ancestor);
-
-                       result = list_concat(result, ancestor_pubs);
-               }
-       }
-
-       return result;
-}
-
-/* Workhorse of GetRelationPublications() */
-static List *
-get_rel_publications(Oid relid)
 {
        List       *result = NIL;
        CatCList   *pubrellist;
@@ -373,9 +343,13 @@ GetAllTablesPublications(void)
 
 /*
  * Gets list of all relation published by FOR ALL TABLES publication(s).
+ *
+ * If the publication publishes partition changes via their respective root
+ * partitioned tables, we must exclude partitions in favor of including the
+ * root partitioned tables.
  */
 List *
-GetAllTablesPublicationRelations(void)
+GetAllTablesPublicationRelations(bool pubviaroot)
 {
        Relation        classRel;
        ScanKeyData key[1];
@@ -397,12 +371,35 @@ GetAllTablesPublicationRelations(void)
                Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
                Oid                     relid = relForm->oid;
 
-               if (is_publishable_class(relid, relForm))
+               if (is_publishable_class(relid, relForm) &&
+                       !(relForm->relispartition && pubviaroot))
                        result = lappend_oid(result, relid);
        }
 
        table_endscan(scan);
-       table_close(classRel, AccessShareLock);
+
+       if (pubviaroot)
+       {
+               ScanKeyInit(&key[0],
+                                       Anum_pg_class_relkind,
+                                       BTEqualStrategyNumber, F_CHAREQ,
+                                       CharGetDatum(RELKIND_PARTITIONED_TABLE));
+
+               scan = table_beginscan_catalog(classRel, 1, key);
+
+               while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+               {
+                       Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
+                       Oid                     relid = relForm->oid;
+
+                       if (is_publishable_class(relid, relForm) &&
+                               !relForm->relispartition)
+                               result = lappend_oid(result, relid);
+               }
+
+               table_endscan(scan);
+               table_close(classRel, AccessShareLock);
+       }
 
        return result;
 }
@@ -433,6 +430,7 @@ GetPublication(Oid pubid)
        pub->pubactions.pubupdate = pubform->pubupdate;
        pub->pubactions.pubdelete = pubform->pubdelete;
        pub->pubactions.pubtruncate = pubform->pubtruncate;
+       pub->pubviaroot = pubform->pubviaroot;
 
        ReleaseSysCache(tup);
 
@@ -533,9 +531,11 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
                 * need those.
                 */
                if (publication->alltables)
-                       tables = GetAllTablesPublicationRelations();
+                       tables = GetAllTablesPublicationRelations(publication->pubviaroot);
                else
                        tables = GetPublicationRelations(publication->oid,
+                                                                                        publication->pubviaroot ?
+                                                                                        PUBLICATION_PART_ROOT :
                                                                                         PUBLICATION_PART_LEAF);
                funcctx->user_fctx = (void *) tables;
 
index 494c0bdc282c1eb7352c124e38a9c38950cbf252..771268f70a260bbea0009a5f8691fc1e2a1198d9 100644 (file)
@@ -23,6 +23,7 @@
 #include "catalog/namespace.h"
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
+#include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
@@ -56,20 +57,21 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
 static void
 parse_publication_options(List *options,
                                                  bool *publish_given,
-                                                 bool *publish_insert,
-                                                 bool *publish_update,
-                                                 bool *publish_delete,
-                                                 bool *publish_truncate)
+                                                 PublicationActions *pubactions,
+                                                 bool *publish_via_partition_root_given,
+                                                 bool *publish_via_partition_root)
 {
        ListCell   *lc;
 
        *publish_given = false;
+       *publish_via_partition_root_given = false;
 
-       /* Defaults are true */
-       *publish_insert = true;
-       *publish_update = true;
-       *publish_delete = true;
-       *publish_truncate = true;
+       /* defaults */
+       pubactions->pubinsert = true;
+       pubactions->pubupdate = true;
+       pubactions->pubdelete = true;
+       pubactions->pubtruncate = true;
+       *publish_via_partition_root = false;
 
        /* Parse options */
        foreach(lc, options)
@@ -91,10 +93,10 @@ parse_publication_options(List *options,
                         * If publish option was given only the explicitly listed actions
                         * should be published.
                         */
-                       *publish_insert = false;
-                       *publish_update = false;
-                       *publish_delete = false;
-                       *publish_truncate = false;
+                       pubactions->pubinsert = false;
+                       pubactions->pubupdate = false;
+                       pubactions->pubdelete = false;
+                       pubactions->pubtruncate = false;
 
                        *publish_given = true;
                        publish = defGetString(defel);
@@ -110,19 +112,28 @@ parse_publication_options(List *options,
                                char       *publish_opt = (char *) lfirst(lc);
 
                                if (strcmp(publish_opt, "insert") == 0)
-                                       *publish_insert = true;
+                                       pubactions->pubinsert = true;
                                else if (strcmp(publish_opt, "update") == 0)
-                                       *publish_update = true;
+                                       pubactions->pubupdate = true;
                                else if (strcmp(publish_opt, "delete") == 0)
-                                       *publish_delete = true;
+                                       pubactions->pubdelete = true;
                                else if (strcmp(publish_opt, "truncate") == 0)
-                                       *publish_truncate = true;
+                                       pubactions->pubtruncate = true;
                                else
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_SYNTAX_ERROR),
                                                         errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt)));
                        }
                }
+               else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
+               {
+                       if (*publish_via_partition_root_given)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("conflicting or redundant options")));
+                       *publish_via_partition_root_given = true;
+                       *publish_via_partition_root = defGetBoolean(defel);
+               }
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -143,10 +154,9 @@ CreatePublication(CreatePublicationStmt *stmt)
        Datum           values[Natts_pg_publication];
        HeapTuple       tup;
        bool            publish_given;
-       bool            publish_insert;
-       bool            publish_update;
-       bool            publish_delete;
-       bool            publish_truncate;
+       PublicationActions pubactions;
+       bool            publish_via_partition_root_given;
+       bool            publish_via_partition_root;
        AclResult       aclresult;
 
        /* must have CREATE privilege on database */
@@ -183,9 +193,9 @@ CreatePublication(CreatePublicationStmt *stmt)
        values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
 
        parse_publication_options(stmt->options,
-                                                         &publish_given, &publish_insert,
-                                                         &publish_update, &publish_delete,
-                                                         &publish_truncate);
+                                                         &publish_given, &pubactions,
+                                                         &publish_via_partition_root_given,
+                                                         &publish_via_partition_root);
 
        puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
                                                                Anum_pg_publication_oid);
@@ -193,13 +203,15 @@ CreatePublication(CreatePublicationStmt *stmt)
        values[Anum_pg_publication_puballtables - 1] =
                BoolGetDatum(stmt->for_all_tables);
        values[Anum_pg_publication_pubinsert - 1] =
-               BoolGetDatum(publish_insert);
+               BoolGetDatum(pubactions.pubinsert);
        values[Anum_pg_publication_pubupdate - 1] =
-               BoolGetDatum(publish_update);
+               BoolGetDatum(pubactions.pubupdate);
        values[Anum_pg_publication_pubdelete - 1] =
-               BoolGetDatum(publish_delete);
+               BoolGetDatum(pubactions.pubdelete);
        values[Anum_pg_publication_pubtruncate - 1] =
-               BoolGetDatum(publish_truncate);
+               BoolGetDatum(pubactions.pubtruncate);
+       values[Anum_pg_publication_pubviaroot - 1] =
+               BoolGetDatum(publish_via_partition_root);
 
        tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
@@ -251,17 +263,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
        bool            replaces[Natts_pg_publication];
        Datum           values[Natts_pg_publication];
        bool            publish_given;
-       bool            publish_insert;
-       bool            publish_update;
-       bool            publish_delete;
-       bool            publish_truncate;
+       PublicationActions pubactions;
+       bool            publish_via_partition_root_given;
+       bool            publish_via_partition_root;
        ObjectAddress obj;
        Form_pg_publication pubform;
 
        parse_publication_options(stmt->options,
-                                                         &publish_given, &publish_insert,
-                                                         &publish_update, &publish_delete,
-                                                         &publish_truncate);
+                                                         &publish_given, &pubactions,
+                                                         &publish_via_partition_root_given,
+                                                         &publish_via_partition_root);
 
        /* Everything ok, form a new tuple. */
        memset(values, 0, sizeof(values));
@@ -270,19 +281,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
 
        if (publish_given)
        {
-               values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert);
+               values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
                replaces[Anum_pg_publication_pubinsert - 1] = true;
 
-               values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update);
+               values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
                replaces[Anum_pg_publication_pubupdate - 1] = true;
 
-               values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete);
+               values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
                replaces[Anum_pg_publication_pubdelete - 1] = true;
 
-               values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate);
+               values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
                replaces[Anum_pg_publication_pubtruncate - 1] = true;
        }
 
+       if (publish_via_partition_root_given)
+       {
+               values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
+               replaces[Anum_pg_publication_pubviaroot - 1] = true;
+       }
+
        tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
                                                        replaces);
 
index 552a70cffa5590261d2366a2f3b3218fd9a241e2..5fbf2d4367b14f2d25613136082d70a61930926d 100644 (file)
@@ -12,6 +12,8 @@
  */
 #include "postgres.h"
 
+#include "access/tupconvert.h"
+#include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "fmgr.h"
 #include "replication/logical.h"
@@ -20,6 +22,7 @@
 #include "replication/pgoutput.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -49,6 +52,7 @@ static bool publications_valid;
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
                                                                                uint32 hashvalue);
+static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
@@ -59,9 +63,31 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
 typedef struct RelationSyncEntry
 {
        Oid                     relid;                  /* relation oid */
-       bool            schema_sent;    /* did we send the schema? */
+
+       /*
+        * Did we send the schema?  If ancestor relid is set, its schema must also
+        * have been sent for this to be true.
+        */
+       bool            schema_sent;
+
        bool            replicate_valid;
        PublicationActions pubactions;
+
+       /*
+        * OID of the relation to publish changes as.  For a partition, this may
+        * be set to one of its ancestors whose schema will be used when
+        * replicating changes, if publish_via_partition_root is set for the
+        * publication.
+        */
+       Oid                     publish_as_relid;
+
+       /*
+        * Map used when replicating using an ancestor's schema to convert tuples
+        * from partition's type to the ancestor's; NULL if publish_as_relid is
+        * same as 'relid' or if unnecessary due to partition and the ancestor
+        * having identical TupleDesc.
+        */
+       TupleConversionMap *map;
 } RelationSyncEntry;
 
 /* Map used to remember which relation schemas we sent. */
@@ -259,47 +285,71 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 }
 
 /*
- * Write the relation schema if the current schema hasn't been sent yet.
+ * Write the current schema of the relation and its ancestor (if any) if not
+ * done yet.
  */
 static void
 maybe_send_schema(LogicalDecodingContext *ctx,
                                  Relation relation, RelationSyncEntry *relentry)
 {
-       if (!relentry->schema_sent)
+       if (relentry->schema_sent)
+               return;
+
+       /* If needed, send the ancestor's schema first. */
+       if (relentry->publish_as_relid != RelationGetRelid(relation))
        {
-               TupleDesc       desc;
-               int                     i;
+               Relation        ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+               TupleDesc       indesc = RelationGetDescr(relation);
+               TupleDesc       outdesc = RelationGetDescr(ancestor);
+               MemoryContext oldctx;
+
+               /* Map must live as long as the session does. */
+               oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+               relentry->map = convert_tuples_by_name(indesc, outdesc);
+               MemoryContextSwitchTo(oldctx);
+               send_relation_and_attrs(ancestor, ctx);
+               RelationClose(ancestor);
+       }
 
-               desc = RelationGetDescr(relation);
+       send_relation_and_attrs(relation, ctx);
+       relentry->schema_sent = true;
+}
 
-               /*
-                * Write out type info if needed.  We do that only for user-created
-                * types.  We use FirstGenbkiObjectId as the cutoff, so that we only
-                * consider objects with hand-assigned OIDs to be "built in", not for
-                * instance any function or type defined in the information_schema.
-                * This is important because only hand-assigned OIDs can be expected
-                * to remain stable across major versions.
-                */
-               for (i = 0; i < desc->natts; i++)
-               {
-                       Form_pg_attribute att = TupleDescAttr(desc, i);
+/*
+ * Sends a relation
+ */
+static void
+send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
+{
+       TupleDesc       desc = RelationGetDescr(relation);
+       int                     i;
 
-                       if (att->attisdropped || att->attgenerated)
-                               continue;
+       /*
+        * Write out type info if needed.  We do that only for user-created types.
+        * We use FirstGenbkiObjectId as the cutoff, so that we only consider
+        * objects with hand-assigned OIDs to be "built in", not for instance any
+        * function or type defined in the information_schema. This is important
+        * because only hand-assigned OIDs can be expected to remain stable across
+        * major versions.
+        */
+       for (i = 0; i < desc->natts; i++)
+       {
+               Form_pg_attribute att = TupleDescAttr(desc, i);
 
-                       if (att->atttypid < FirstGenbkiObjectId)
-                               continue;
+               if (att->attisdropped || att->attgenerated)
+                       continue;
 
-                       OutputPluginPrepareWrite(ctx, false);
-                       logicalrep_write_typ(ctx->out, att->atttypid);
-                       OutputPluginWrite(ctx, false);
-               }
+               if (att->atttypid < FirstGenbkiObjectId)
+                       continue;
 
                OutputPluginPrepareWrite(ctx, false);
-               logicalrep_write_rel(ctx->out, relation);
+               logicalrep_write_typ(ctx->out, att->atttypid);
                OutputPluginWrite(ctx, false);
-               relentry->schema_sent = true;
        }
+
+       OutputPluginPrepareWrite(ctx, false);
+       logicalrep_write_rel(ctx->out, relation);
+       OutputPluginWrite(ctx, false);
 }
 
 /*
@@ -346,28 +396,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        switch (change->action)
        {
                case REORDER_BUFFER_CHANGE_INSERT:
-                       OutputPluginPrepareWrite(ctx, true);
-                       logicalrep_write_insert(ctx->out, relation,
-                                                                       &change->data.tp.newtuple->tuple);
-                       OutputPluginWrite(ctx, true);
-                       break;
+                       {
+                               HeapTuple       tuple = &change->data.tp.newtuple->tuple;
+
+                               /* Switch relation if publishing via root. */
+                               if (relentry->publish_as_relid != RelationGetRelid(relation))
+                               {
+                                       Assert(relation->rd_rel->relispartition);
+                                       relation = RelationIdGetRelation(relentry->publish_as_relid);
+                                       /* Convert tuple if needed. */
+                                       if (relentry->map)
+                                               tuple = execute_attr_map_tuple(tuple, relentry->map);
+                               }
+
+                               OutputPluginPrepareWrite(ctx, true);
+                               logicalrep_write_insert(ctx->out, relation, tuple);
+                               OutputPluginWrite(ctx, true);
+                               break;
+                       }
                case REORDER_BUFFER_CHANGE_UPDATE:
                        {
                                HeapTuple       oldtuple = change->data.tp.oldtuple ?
                                &change->data.tp.oldtuple->tuple : NULL;
+                               HeapTuple       newtuple = &change->data.tp.newtuple->tuple;
+
+                               /* Switch relation if publishing via root. */
+                               if (relentry->publish_as_relid != RelationGetRelid(relation))
+                               {
+                                       Assert(relation->rd_rel->relispartition);
+                                       relation = RelationIdGetRelation(relentry->publish_as_relid);
+                                       /* Convert tuples if needed. */
+                                       if (relentry->map)
+                                       {
+                                               oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+                                               newtuple = execute_attr_map_tuple(newtuple, relentry->map);
+                                       }
+                               }
 
                                OutputPluginPrepareWrite(ctx, true);
-                               logicalrep_write_update(ctx->out, relation, oldtuple,
-                                                                               &change->data.tp.newtuple->tuple);
+                               logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
                                OutputPluginWrite(ctx, true);
                                break;
                        }
                case REORDER_BUFFER_CHANGE_DELETE:
                        if (change->data.tp.oldtuple)
                        {
+                               HeapTuple       oldtuple = &change->data.tp.oldtuple->tuple;
+
+                               /* Switch relation if publishing via root. */
+                               if (relentry->publish_as_relid != RelationGetRelid(relation))
+                               {
+                                       Assert(relation->rd_rel->relispartition);
+                                       relation = RelationIdGetRelation(relentry->publish_as_relid);
+                                       /* Convert tuple if needed. */
+                                       if (relentry->map)
+                                               oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+                               }
+
                                OutputPluginPrepareWrite(ctx, true);
-                               logicalrep_write_delete(ctx->out, relation,
-                                                                               &change->data.tp.oldtuple->tuple);
+                               logicalrep_write_delete(ctx->out, relation, oldtuple);
                                OutputPluginWrite(ctx, true);
                        }
                        else
@@ -412,10 +499,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                        continue;
 
                /*
-                * Don't send partitioned tables, because partitions should be sent
-                * instead.
+                * Don't send partitions if the publication wants to send only the
+                * root tables through it.
                 */
-               if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               if (relation->rd_rel->relispartition &&
+                       relentry->publish_as_relid != relid)
                        continue;
 
                relids[nrelids++] = relid;
@@ -540,12 +628,15 @@ init_rel_sync_cache(MemoryContext cachectx)
  * This looks up publications that the given relation is directly or
  * indirectly part of (the latter if it's really the relation's ancestor that
  * is part of a publication) and fills up the found entry with the information
- * about which operations to publish.
+ * about which operations to publish and whether to use an ancestor's schema
+ * when publishing.
  */
 static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Oid relid)
 {
        RelationSyncEntry *entry;
+       bool            am_partition = get_rel_relispartition(relid);
+       char            relkind = get_rel_relkind(relid);
        bool            found;
        MemoryContext oldctx;
 
@@ -564,6 +655,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
        {
                List       *pubids = GetRelationPublications(relid);
                ListCell   *lc;
+               Oid                     publish_as_relid = relid;
 
                /* Reload publications if needed before use. */
                if (!publications_valid)
@@ -588,8 +680,56 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                foreach(lc, data->publications)
                {
                        Publication *pub = lfirst(lc);
+                       bool            publish = false;
+
+                       if (pub->alltables)
+                       {
+                               publish = true;
+                               if (pub->pubviaroot && am_partition)
+                                       publish_as_relid = llast_oid(get_partition_ancestors(relid));
+                       }
+
+                       if (!publish)
+                       {
+                               bool    ancestor_published = false;
+
+                               /*
+                                * For a partition, check if any of the ancestors are
+                                * published.  If so, note down the topmost ancestor that is
+                                * published via this publication, which will be used as the
+                                * relation via which to publish the partition's changes.
+                                */
+                               if (am_partition)
+                               {
+                                       List   *ancestors = get_partition_ancestors(relid);
+                                       ListCell *lc2;
+
+                                       /* Find the "topmost" ancestor that is in this publication. */
+                                       foreach(lc2, ancestors)
+                                       {
+                                               Oid             ancestor = lfirst_oid(lc2);
+
+                                               if (list_member_oid(GetRelationPublications(ancestor),
+                                                                                       pub->oid))
+                                               {
+                                                       ancestor_published = true;
+                                                       if (pub->pubviaroot)
+                                                               publish_as_relid = ancestor;
+                                               }
+                                       }
+                               }
+
+                               if (list_member_oid(pubids, pub->oid) || ancestor_published)
+                                       publish = true;
+                       }
 
-                       if (pub->alltables || list_member_oid(pubids, pub->oid))
+                       /*
+                        * Don't publish changes for partitioned tables, because
+                        * publishing those of its partitions suffices, unless partition
+                        * changes won't be published due to pubviaroot being set.
+                        */
+                       if (publish &&
+                               (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot))
                        {
                                entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
                                entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
@@ -604,6 +744,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 
                list_free(pubids);
 
+               entry->publish_as_relid = publish_as_relid;
                entry->replicate_valid = true;
        }
 
index dfd81f1320e350db37c077e0577265824d5c8d53..9f1f11d0c144cf116699806912f9df70264dfb97 100644 (file)
@@ -44,6 +44,7 @@
 #include "catalog/catalog.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
+#include "catalog/partition.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_amproc.h"
 #include "catalog/pg_attrdef.h"
@@ -5314,6 +5315,20 @@ GetRelationPublicationActions(Relation relation)
 
        /* Fetch the publication membership info. */
        puboids = GetRelationPublications(RelationGetRelid(relation));
+       if (relation->rd_rel->relispartition)
+       {
+               /* Add publications that the ancestors are in too. */
+               List   *ancestors = get_partition_ancestors(RelationGetRelid(relation));
+               ListCell *lc;
+
+               foreach(lc, ancestors)
+               {
+                       Oid             ancestor = lfirst_oid(lc);
+
+                       puboids = list_concat_unique_oid(puboids,
+                                                                                        GetRelationPublications(ancestor));
+               }
+       }
        puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
 
        foreach(lc, puboids)
index 408637cfec4ef31bdc0d89238399e89a391fd3ba..c579227b1974899d029753bb92560da86cdc039c 100644 (file)
@@ -3868,6 +3868,7 @@ getPublications(Archive *fout)
        int                     i_pubupdate;
        int                     i_pubdelete;
        int                     i_pubtruncate;
+       int                     i_pubviaroot;
        int                     i,
                                ntups;
 
@@ -3879,18 +3880,25 @@ getPublications(Archive *fout)
        resetPQExpBuffer(query);
 
        /* Get the publications. */
-       if (fout->remoteVersion >= 110000)
+       if (fout->remoteVersion >= 130000)
+               appendPQExpBuffer(query,
+                                                 "SELECT p.tableoid, p.oid, p.pubname, "
+                                                 "(%s p.pubowner) AS rolname, "
+                                                 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+                                                 "FROM pg_publication p",
+                                                 username_subquery);
+       else if (fout->remoteVersion >= 110000)
                appendPQExpBuffer(query,
                                                  "SELECT p.tableoid, p.oid, p.pubname, "
                                                  "(%s p.pubowner) AS rolname, "
-                                                 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate "
+                                                 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
                                                  "FROM pg_publication p",
                                                  username_subquery);
        else
                appendPQExpBuffer(query,
                                                  "SELECT p.tableoid, p.oid, p.pubname, "
                                                  "(%s p.pubowner) AS rolname, "
-                                                 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate "
+                                                 "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
                                                  "FROM pg_publication p",
                                                  username_subquery);
 
@@ -3907,6 +3915,7 @@ getPublications(Archive *fout)
        i_pubupdate = PQfnumber(res, "pubupdate");
        i_pubdelete = PQfnumber(res, "pubdelete");
        i_pubtruncate = PQfnumber(res, "pubtruncate");
+       i_pubviaroot = PQfnumber(res, "pubviaroot");
 
        pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -3929,6 +3938,8 @@ getPublications(Archive *fout)
                        (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0);
                pubinfo[i].pubtruncate =
                        (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
+               pubinfo[i].pubviaroot =
+                       (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
 
                if (strlen(pubinfo[i].rolname) == 0)
                        pg_log_warning("owner of publication \"%s\" appears to be invalid",
@@ -4005,7 +4016,12 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo)
                first = false;
        }
 
-       appendPQExpBufferStr(query, "');\n");
+       appendPQExpBufferStr(query, "'");
+
+       if (pubinfo->pubviaroot)
+               appendPQExpBufferStr(query, ", publish_via_partition_root = true");
+
+       appendPQExpBufferStr(query, ");\n");
 
        ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId,
                                 ARCHIVE_OPTS(.tag = pubinfo->dobj.name,
index 3e11166615e105df7f0a9005cd1ee71c34744e4b..61c909e06d837e66bbb8f084acac7cceeeb8f067 100644 (file)
@@ -602,6 +602,7 @@ typedef struct _PublicationInfo
        bool            pubupdate;
        bool            pubdelete;
        bool            pubtruncate;
+       bool            pubviaroot;
 } PublicationInfo;
 
 /*
index 109245fea78c185201638f7c0bf68476da884f56..f05e914b4de52d9333f5b11b79078bc6c3d31d3d 100644 (file)
@@ -5707,7 +5707,7 @@ listPublications(const char *pattern)
        PQExpBufferData buf;
        PGresult   *res;
        printQueryOpt myopt = pset.popt;
-       static const bool translate_columns[] = {false, false, false, false, false, false, false};
+       static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
 
        if (pset.sversion < 100000)
        {
@@ -5738,6 +5738,10 @@ listPublications(const char *pattern)
                appendPQExpBuffer(&buf,
                                                  ",\n  pubtruncate AS \"%s\"",
                                                  gettext_noop("Truncates"));
+       if (pset.sversion >= 130000)
+               appendPQExpBuffer(&buf,
+                                                 ",\n  pubviaroot AS \"%s\"",
+                                                 gettext_noop("Via root"));
 
        appendPQExpBufferStr(&buf,
                                                 "\nFROM pg_catalog.pg_publication\n");
@@ -5779,6 +5783,7 @@ describePublications(const char *pattern)
        int                     i;
        PGresult   *res;
        bool            has_pubtruncate;
+       bool            has_pubviaroot;
 
        if (pset.sversion < 100000)
        {
@@ -5791,6 +5796,7 @@ describePublications(const char *pattern)
        }
 
        has_pubtruncate = (pset.sversion >= 110000);
+       has_pubviaroot = (pset.sversion >= 130000);
 
        initPQExpBuffer(&buf);
 
@@ -5801,6 +5807,9 @@ describePublications(const char *pattern)
        if (has_pubtruncate)
                appendPQExpBufferStr(&buf,
                                                         ", pubtruncate");
+       if (has_pubviaroot)
+               appendPQExpBufferStr(&buf,
+                                                        ", pubviaroot");
        appendPQExpBufferStr(&buf,
                                                 "\nFROM pg_catalog.pg_publication\n");
 
@@ -5850,6 +5859,8 @@ describePublications(const char *pattern)
 
                if (has_pubtruncate)
                        ncols++;
+               if (has_pubviaroot)
+                       ncols++;
 
                initPQExpBuffer(&title);
                printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -5862,6 +5873,8 @@ describePublications(const char *pattern)
                printTableAddHeader(&cont, gettext_noop("Deletes"), true, align);
                if (has_pubtruncate)
                        printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
+               if (has_pubviaroot)
+                       printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
 
                printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
                printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -5870,6 +5883,8 @@ describePublications(const char *pattern)
                printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false);
                if (has_pubtruncate)
                        printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
+               if (has_pubviaroot)
+                       printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
 
                if (!puballtables)
                {
index 27381d7874f259720a10839d83a990e200bccdcb..13bbddf785ec703993b724136e126985f504d2b3 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202004073
+#define CATALOG_VERSION_NO     202004074
 
 #endif
index bb52e8c5e0853262c15cd085900210225d90bfc8..ec02f48da0fa78a4092a69127bed535636d87ff0 100644 (file)
@@ -52,6 +52,8 @@ CATALOG(pg_publication,6104,PublicationRelationId)
        /* true if truncates are published */
        bool            pubtruncate;
 
+       /* true if partition changes are published using root schema */
+       bool            pubviaroot;
 } FormData_pg_publication;
 
 /* ----------------
@@ -74,6 +76,7 @@ typedef struct Publication
        Oid                     oid;
        char       *name;
        bool            alltables;
+       bool            pubviaroot;
        PublicationActions pubactions;
 } Publication;
 
@@ -99,7 +102,7 @@ typedef enum PublicationPartOpt
 
 extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
 extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(void);
+extern List *GetAllTablesPublicationRelations(bool pubviaroot);
 
 extern bool is_publishable_relation(Relation rel);
 extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
index 2634d2c1e142bf5affb4d11ca413456b4f62b361..63d6ab7a4ef265b3a5286453d85f05502e274e91 100644 (file)
@@ -25,21 +25,23 @@ CREATE PUBLICATION testpub_xxx WITH (foo);
 ERROR:  unrecognized publication parameter: "foo"
 CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
 ERROR:  unrecognized "publish" value: "cluster"
+CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0');
+ERROR:  conflicting or redundant options
 \dRp
-                                         List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------+--------------------------+------------+---------+---------+---------+-----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f
- testpub_default    | regress_publication_user | f          | f       | t       | f       | f
+                                              List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                         List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------+--------------------------+------------+---------+---------+---------+-----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f
- testpub_default    | regress_publication_user | f          | t       | t       | t       | f
+                                              List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f
 (2 rows)
 
 --- adding tables
@@ -83,10 +85,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                        Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | t          | t       | t       | f       | f
+                              Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | t          | t       | t       | f       | f         | f
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -98,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                              Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                    Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                              Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                    Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_tbl3"
 
@@ -129,10 +131,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                          Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                               Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "public.testpub_parted"
 
@@ -143,6 +145,15 @@ HINT:  To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.
 ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 -- works again, because parent's publication is no longer considered
 UPDATE testpub_parted1 SET a = 1;
+ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
+\dRp+ testpub_forparted
+                               Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | t
+Tables:
+    "public.testpub_parted"
+
 DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 -- fail - view
@@ -159,10 +170,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                           Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | t
+                                 Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -200,10 +211,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                           Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | f
+                                Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | f         | f
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -247,10 +258,10 @@ DROP TABLE testpub_parted;
 DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                           Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
---------------------------+------------+---------+---------+---------+-----------
- regress_publication_user | f          | t       | t       | t       | f
+                                Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | f         | f
 (1 row)
 
 -- fail - must be owner of publication
@@ -260,20 +271,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                                     List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates 
--------------+--------------------------+------------+---------+---------+---------+-----------
- testpub_foo | regress_publication_user | f          | t       | t       | t       | f
+                                           List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                        List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates 
------------------+---------------------------+------------+---------+---------+---------+-----------
- testpub_default | regress_publication_user2 | f          | t       | t       | t       | f
+                                             List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f
 (1 row)
 
 DROP PUBLICATION testpub_default;
index 219e04129d2d9e8508c6888a9ba6ae9dfa88c5dd..d844075368d0c56c051beb5effab21383d3a8682 100644 (file)
@@ -23,6 +23,7 @@ ALTER PUBLICATION testpub_default SET (publish = update);
 -- error cases
 CREATE PUBLICATION testpub_xxx WITH (foo);
 CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum');
+CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0');
 
 \dRp
 
@@ -87,6 +88,8 @@ UPDATE testpub_parted1 SET a = 1;
 ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 -- works again, because parent's publication is no longer considered
 UPDATE testpub_parted1 SET a = 1;
+ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
+\dRp+ testpub_forparted
 DROP TABLE testpub_parted1;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
index 5db1b21c594a264c72b9b2a077eaa3e4cdc8fb00..208bb556ce41217636757ca33a2b559f52a95900 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 24;
+use Test::More tests => 51;
 
 # setup
 
@@ -48,7 +48,6 @@ $node_subscriber1->safe_psql('postgres',
        "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
 $node_subscriber1->safe_psql('postgres',
        "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)");
-
 $node_subscriber1->safe_psql('postgres',
        "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
 $node_subscriber1->safe_psql('postgres',
@@ -87,6 +86,8 @@ $node_subscriber1->poll_query_until('postgres', $synced_query)
 $node_subscriber2->poll_query_until('postgres', $synced_query)
   or die "Timed out while waiting for subscriber to synchronize data";
 
+# Tests for replication using leaf partition identity and schema
+
 # insert
 $node_publisher->safe_psql('postgres',
        "INSERT INTO tab1 VALUES (1)");
@@ -260,3 +261,296 @@ is($result, qq(), 'truncate of tab1_1 replicated');
 $result = $node_subscriber2->safe_psql('postgres',
        "SELECT a FROM tab1 ORDER BY 1");
 is($result, qq(), 'truncate of tab1 replicated');
+
+# Tests for replication using root table identity and schema
+
+# publisher
+$node_publisher->safe_psql('postgres',
+       "DROP PUBLICATION pub1");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab2_1 (b text, a int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+       "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (0, 1, 2, 3)");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)");
+
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (0, 1, 2, 3, 5, 6)");
+$node_publisher->safe_psql('postgres',
+       "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)");
+# Note: tab3_1's parent is not in the publication, in which case its
+# changes are published using own identity.
+$node_publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab3_1 WITH (publish_via_partition_root = true)");
+
+# subscriber 1
+$node_subscriber1->safe_psql('postgres',
+       "DROP SUBSCRIPTION sub1");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)");
+$node_subscriber1->safe_psql('postgres',
+       "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (0) TO (10)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub_viaroot CONNECTION '$publisher_connstr' PUBLICATION pub_viaroot");
+
+# subscriber 2
+$node_subscriber2->safe_psql('postgres',
+       "DROP TABLE tab1");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)");
+# Note: tab1's partitions are named tab1_1 and tab1_2 on the publisher.
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)");
+$node_subscriber2->safe_psql('postgres',
+       "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3', b text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab3_1', b text)");
+# Publication that sub2 points to now publishes via root, so must update
+# subscription target relations.
+$node_subscriber2->safe_psql('postgres',
+       "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+# Wait for initial sync of all subscriptions
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1 VALUES (1), (0)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1_1 (a) VALUES (3)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1_2 VALUES (5)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab2 VALUES (1), (0), (3), (5)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab3 VALUES (1), (0), (3), (5)");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|3
+sub1_tab2|5), 'inserts into tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|3
+sub1_tab3_1|5), 'inserts into tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|3
+sub2_tab1|5), 'inserts into tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|3
+sub2_tab2|5), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|3
+sub2_tab3|5), 'inserts into tab3 replicated');
+
+# update (replicated as update)
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab2 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab3 SET a = 6 WHERE a = 5");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|3
+sub1_tab2|6), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|3
+sub1_tab3_1|6), 'update of tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|3
+sub2_tab1|6), 'inserts into tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|3
+sub2_tab2|6), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|3
+sub2_tab3|6), 'inserts into tab3 replicated');
+
+# update (replicated as delete+insert)
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 2 WHERE a = 6");
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab2 SET a = 2 WHERE a = 6");
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab3 SET a = 2 WHERE a = 6");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub1_tab2|0
+sub1_tab2|1
+sub1_tab2|2
+sub1_tab2|3), 'update of tab2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT c, a FROM tab3_1 ORDER BY 1, 2");
+is($result, qq(sub1_tab3_1|0
+sub1_tab3_1|1
+sub1_tab3_1|2
+sub1_tab3_1|3), 'update of tab3_1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1|0
+sub2_tab1|1
+sub2_tab1|2
+sub2_tab1|3), 'update of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab2 ORDER BY 1, 2");
+is($result, qq(sub2_tab2|0
+sub2_tab2|1
+sub2_tab2|2
+sub2_tab2|3), 'update of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab3 ORDER BY 1, 2");
+is($result, qq(sub2_tab3|0
+sub2_tab3|1
+sub2_tab3|2
+sub2_tab3|3), 'update of tab3 replicated');
+
+# delete
+$node_publisher->safe_psql('postgres',
+       "DELETE FROM tab1");
+$node_publisher->safe_psql('postgres',
+       "DELETE FROM tab2");
+$node_publisher->safe_psql('postgres',
+       "DELETE FROM tab3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT a FROM tab2");
+is($result, qq(), 'delete tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab1");
+is($result, qq(), 'delete from tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab2");
+is($result, qq(), 'delete from tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab3");
+is($result, qq(), 'delete from tab3 replicated');
+
+# truncate
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1 VALUES (1), (2), (5)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab2 VALUES (1), (2), (5)");
+# these will NOT be replicated
+$node_publisher->safe_psql('postgres',
+       "TRUNCATE tab1_2, tab2_1, tab3_1");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT a FROM tab2 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab2_1 NOT replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab1_2 NOT replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab2 ORDER BY 1");
+is($result, qq(1
+2
+5), 'truncate of tab2_1 NOT replicated');
+
+$node_publisher->safe_psql('postgres',
+       "TRUNCATE tab1, tab2, tab3");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT a FROM tab2");
+is($result, qq(), 'truncate of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab1");
+is($result, qq(), 'truncate of tab1 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab2");
+is($result, qq(), 'truncate of tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab3");
+is($result, qq(), 'truncate of tab3 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a FROM tab3_1");
+is($result, qq(), 'truncate of tab3_1 replicated');