Add logical replication support to replicate into partitioned tables
authorPeter Eisentraut <peter@eisentraut.org>
Mon, 6 Apr 2020 13:15:52 +0000 (15:15 +0200)
committerPeter Eisentraut <peter@eisentraut.org>
Mon, 6 Apr 2020 13:15:52 +0000 (15:15 +0200)
Mainly, this adds support code in logical/worker.c for applying
replicated operations whose target is a partitioned table to its
relevant partitions.

Author: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Peter Eisentraut <peter.eisentraut@2ndquadrant.com>
Reviewed-by: Petr Jelinek <petr@2ndquadrant.com>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com

doc/src/sgml/logical-replication.sgml
src/backend/executor/execReplication.c
src/backend/replication/logical/relation.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/replication/logicalrelation.h
src/test/subscription/t/013_partition.pl

index 8bd7c9c8ac0941c8fb4290c3c90b32d53d451043..c513621470afce8149d3bb056eca962ccb141efa 100644 (file)
 
    <listitem>
     <para>
-     Replication is only supported by tables, partitioned or not, although a
-     given table must either be partitioned on both servers or not partitioned
-     at all.  Also, when replicating between partitioned tables, the actual
-     replication occurs between leaf partitions, so partitions on the two
-     servers must match one-to-one.
+     Replication is only supported by tables, including partitioned tables.
+     Attempts to replicate other types of relations such as views, materialized
+     views, or foreign tables, will result in an error.
     </para>
+   </listitem>
 
+   <listitem>
     <para>
-     Attempts to replicate other types of relations such as views, materialized
-     views, or foreign tables, will result in an error.
+     When replicating between partitioned tables, the actual replication
+     originates from the leaf partitions on the publisher, so partitions on
+     the publisher must also exist on the subscriber as valid target tables.
+     (They could either be leaf partitions themselves, or they could be
+     further subpartitioned, or they could even be independent tables.)
     </para>
    </listitem>
   </itemizedlist>
index 7194becfd99bc26047083d8f72dbf52063fe1a2d..dc8a01a5cd51672e874baeeaf62492cc601d2135 100644 (file)
@@ -594,17 +594,9 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
                                                 const char *relname)
 {
        /*
-        * We currently only support writing to regular tables.  However, give a
-        * more specific error for partitioned and foreign tables.
+        * Give a more specific error for foreign tables.
         */
-       if (relkind == RELKIND_PARTITIONED_TABLE)
-               ereport(ERROR,
-                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                errmsg("cannot use relation \"%s.%s\" as logical replication target",
-                                               nspname, relname),
-                                errdetail("\"%s.%s\" is a partitioned table.",
-                                                  nspname, relname)));
-       else if (relkind == RELKIND_FOREIGN_TABLE)
+       if (relkind == RELKIND_FOREIGN_TABLE)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("cannot use relation \"%s.%s\" as logical replication target",
@@ -612,7 +604,7 @@ CheckSubscriptionRelkind(char relkind, const char *nspname,
                                 errdetail("\"%s.%s\" is a foreign table.",
                                                   nspname, relname)));
 
-       if (relkind != RELKIND_RELATION)
+       if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
                ereport(ERROR,
                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                 errmsg("cannot use relation \"%s.%s\" as logical replication target",
index 3d7291b97034d401bb2e4a437049e1e0504ea43f..2e7b755aebbd6dffdca3709f1deae8a5a087399a 100644 (file)
@@ -35,6 +35,24 @@ static MemoryContext LogicalRepRelMapContext = NULL;
 static HTAB *LogicalRepRelMap = NULL;
 static HTAB *LogicalRepTypMap = NULL;
 
+/*
+ * Partition map (LogicalRepPartMap)
+ *
+ * When a partitioned table is used as replication target, replicated
+ * operations are actually performed on its leaf partitions, which requires
+ * the partitions to also be mapped to the remote relation.  Parent's entry
+ * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
+ * individual partitions may have different attribute numbers, which means
+ * attribute mappings to remote relation's attributes must be maintained
+ * separately for each partition.
+ */
+static MemoryContext LogicalRepPartMapContext = NULL;
+static HTAB *LogicalRepPartMap = NULL;
+typedef struct LogicalRepPartMapEntry
+{
+       Oid                     partoid;                /* LogicalRepPartMap's key */
+       LogicalRepRelMapEntry relmapentry;
+}                      LogicalRepPartMapEntry;
 
 /*
  * Relcache invalidation callback for our relation map cache.
@@ -472,3 +490,174 @@ logicalrep_typmap_gettypname(Oid remoteid)
        Assert(OidIsValid(entry->remoteid));
        return psprintf("%s.%s", entry->nspname, entry->typname);
 }
+
+/*
+ * Partition cache: look up partition LogicalRepRelMapEntry's
+ *
+ * Unlike relation map cache, this is keyed by partition OID, not remote
+ * relation OID, because we only have to use this cache in the case where
+ * partitions are not directly mapped to any remote relation, such as when
+ * replication is occurring with one of their ancestors as target.
+ */
+
+/*
+ * Relcache invalidation callback
+ */
+static void
+logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
+{
+       LogicalRepRelMapEntry *entry;
+
+       /* Just to be sure. */
+       if (LogicalRepPartMap == NULL)
+               return;
+
+       if (reloid != InvalidOid)
+       {
+               HASH_SEQ_STATUS status;
+
+               hash_seq_init(&status, LogicalRepPartMap);
+
+               /* TODO, use inverse lookup hashtable? */
+               while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+               {
+                       if (entry->localreloid == reloid)
+                       {
+                               entry->localreloid = InvalidOid;
+                               hash_seq_term(&status);
+                               break;
+                       }
+               }
+       }
+       else
+       {
+               /* invalidate all cache entries */
+               HASH_SEQ_STATUS status;
+
+               hash_seq_init(&status, LogicalRepPartMap);
+
+               while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+                       entry->localreloid = InvalidOid;
+       }
+}
+
+/*
+ * Initialize the partition map cache.
+ */
+static void
+logicalrep_partmap_init(void)
+{
+       HASHCTL         ctl;
+
+       if (!LogicalRepPartMapContext)
+               LogicalRepPartMapContext =
+                       AllocSetContextCreate(CacheMemoryContext,
+                                                                 "LogicalRepPartMapContext",
+                                                                 ALLOCSET_DEFAULT_SIZES);
+
+       /* Initialize the relation hash table. */
+       MemSet(&ctl, 0, sizeof(ctl));
+       ctl.keysize = sizeof(Oid);      /* partition OID */
+       ctl.entrysize = sizeof(LogicalRepPartMapEntry);
+       ctl.hcxt = LogicalRepPartMapContext;
+
+       LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
+                                                                       HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+       /* Watch for invalidation events. */
+       CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
+                                                                 (Datum) 0);
+}
+
+/*
+ * logicalrep_partition_open
+ *
+ * Returned entry reuses most of the values of the root table's entry, save
+ * the attribute map, which can be different for the partition.
+ *
+ * Note there's no logialrep_partition_close, because the caller closes the
+ * the component relation.
+ */
+LogicalRepRelMapEntry *
+logicalrep_partition_open(LogicalRepRelMapEntry *root,
+                                                 Relation partrel, AttrMap *map)
+{
+       LogicalRepRelMapEntry *entry;
+       LogicalRepPartMapEntry *part_entry;
+       LogicalRepRelation *remoterel = &root->remoterel;
+       Oid                     partOid = RelationGetRelid(partrel);
+       AttrMap    *attrmap = root->attrmap;
+       bool            found;
+       int                     i;
+       MemoryContext oldctx;
+
+       if (LogicalRepPartMap == NULL)
+               logicalrep_partmap_init();
+
+       /* Search for existing entry. */
+       part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap,
+                                                                                                               (void *) &partOid,
+                                                                                                               HASH_ENTER, &found);
+
+       if (found)
+               return &part_entry->relmapentry;
+
+       memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+
+       /* Switch to longer-lived context. */
+       oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
+
+       part_entry->partoid = partOid;
+
+       /* Remote relation is used as-is from the root entry. */
+       entry = &part_entry->relmapentry;
+       entry->remoterel.remoteid = remoterel->remoteid;
+       entry->remoterel.nspname = pstrdup(remoterel->nspname);
+       entry->remoterel.relname = pstrdup(remoterel->relname);
+       entry->remoterel.natts = remoterel->natts;
+       entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+       entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+       for (i = 0; i < remoterel->natts; i++)
+       {
+               entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+               entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+       }
+       entry->remoterel.replident = remoterel->replident;
+       entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
+
+       entry->localrel = partrel;
+       entry->localreloid = partOid;
+
+       /*
+        * If the partition's attributes don't match the root relation's, we'll
+        * need to make a new attrmap which maps partition attribute numbers to
+        * remoterel's, instead the original which maps root relation's attribute
+        * numbers to remoterel's.
+        *
+        * Note that 'map' which comes from the tuple routing data structure
+        * contains 1-based attribute numbers (of the parent relation).  However,
+        * the map in 'entry', a logical replication data structure, contains
+        * 0-based attribute numbers (of the remote relation).
+        */
+       if (map)
+       {
+               AttrNumber      attno;
+
+               entry->attrmap = make_attrmap(map->maplen);
+               for (attno = 0; attno < entry->attrmap->maplen; attno++)
+               {
+                       AttrNumber      root_attno = map->attnums[attno];
+
+                       entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
+               }
+       }
+       else
+               entry->attrmap = attrmap;
+
+       entry->updatable = root->updatable;
+
+       /* state and statelsn are left set to 0. */
+       MemoryContextSwitchTo(oldctx);
+
+       return entry;
+}
index a60c6661538a23e1b0a2add219daeec6d01510d5..c27d97058955a9c6023afb71b8017648cd740ea3 100644 (file)
@@ -762,7 +762,6 @@ copy_table(Relation rel)
        /* Map the publisher relation to local one. */
        relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock);
        Assert(rel == relmapentry->localrel);
-       Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION);
 
        /* Start copy on the publisher. */
        initStringInfo(&cmd);
index 673ebd211d1832c00a72423f9666acc850a651b7..a752a1224d6fef6f168198e12436a10a9a8fa7b3 100644 (file)
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
 #include "catalog/namespace.h"
+#include "catalog/partition.h"
+#include "catalog/pg_inherits.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
+#include "executor/execPartition.h"
 #include "executor/nodeModifyTable.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
@@ -126,6 +129,12 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
                                                                        LogicalRepRelation *remoterel,
                                                                        TupleTableSlot *remoteslot,
                                                                        TupleTableSlot **localslot);
+static void apply_handle_tuple_routing(ResultRelInfo *relinfo,
+                                                                          EState *estate,
+                                                                          TupleTableSlot *remoteslot,
+                                                                          LogicalRepTupleData *newtup,
+                                                                          LogicalRepRelMapEntry *relmapentry,
+                                                                          CmdType operation);
 
 /*
  * Should this worker apply changes for given relation.
@@ -636,9 +645,13 @@ apply_handle_insert(StringInfo s)
        slot_fill_defaults(rel, estate, remoteslot);
        MemoryContextSwitchTo(oldctx);
 
-       Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
-       apply_handle_insert_internal(estate->es_result_relation_info, estate,
-                                                                remoteslot);
+       /* For a partitioned table, insert the tuple into a partition. */
+       if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+                                                                  remoteslot, NULL, rel, CMD_INSERT);
+       else
+               apply_handle_insert_internal(estate->es_result_relation_info, estate,
+                                                                        remoteslot);
 
        PopActiveSnapshot();
 
@@ -767,9 +780,13 @@ apply_handle_update(StringInfo s)
                                                has_oldtup ? oldtup.values : newtup.values);
        MemoryContextSwitchTo(oldctx);
 
-       Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
-       apply_handle_update_internal(estate->es_result_relation_info, estate,
-                                                                remoteslot, &newtup, rel);
+       /* For a partitioned table, apply update to correct partition. */
+       if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+                                                                  remoteslot, &newtup, rel, CMD_UPDATE);
+       else
+               apply_handle_update_internal(estate->es_result_relation_info, estate,
+                                                                        remoteslot, &newtup, rel);
 
        PopActiveSnapshot();
 
@@ -886,9 +903,13 @@ apply_handle_delete(StringInfo s)
        slot_store_cstrings(remoteslot, rel, oldtup.values);
        MemoryContextSwitchTo(oldctx);
 
-       Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION);
-       apply_handle_delete_internal(estate->es_result_relation_info, estate,
-                                                                remoteslot, &rel->remoterel);
+       /* For a partitioned table, apply delete to correct partition. */
+       if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               apply_handle_tuple_routing(estate->es_result_relation_info, estate,
+                                                                  remoteslot, NULL, rel, CMD_DELETE);
+       else
+               apply_handle_delete_internal(estate->es_result_relation_info, estate,
+                                                                        remoteslot, &rel->remoterel);
 
        PopActiveSnapshot();
 
@@ -975,6 +996,235 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
        return found;
 }
 
+/*
+ * This handles insert, update, delete on a partitioned table.
+ */
+static void
+apply_handle_tuple_routing(ResultRelInfo *relinfo,
+                                                  EState *estate,
+                                                  TupleTableSlot *remoteslot,
+                                                  LogicalRepTupleData *newtup,
+                                                  LogicalRepRelMapEntry *relmapentry,
+                                                  CmdType operation)
+{
+       Relation        parentrel = relinfo->ri_RelationDesc;
+       ModifyTableState *mtstate = NULL;
+       PartitionTupleRouting *proute = NULL;
+       ResultRelInfo *partrelinfo;
+       Relation        partrel;
+       TupleTableSlot *remoteslot_part;
+       PartitionRoutingInfo *partinfo;
+       TupleConversionMap *map;
+       MemoryContext oldctx;
+
+       /* ModifyTableState is needed for ExecFindPartition(). */
+       mtstate = makeNode(ModifyTableState);
+       mtstate->ps.plan = NULL;
+       mtstate->ps.state = estate;
+       mtstate->operation = operation;
+       mtstate->resultRelInfo = relinfo;
+       proute = ExecSetupPartitionTupleRouting(estate, mtstate, parentrel);
+
+       /*
+        * Find the partition to which the "search tuple" belongs.
+        */
+       Assert(remoteslot != NULL);
+       oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+       partrelinfo = ExecFindPartition(mtstate, relinfo, proute,
+                                                                       remoteslot, estate);
+       Assert(partrelinfo != NULL);
+       partrel = partrelinfo->ri_RelationDesc;
+
+       /*
+        * To perform any of the operations below, the tuple must match the
+        * partition's rowtype. Convert if needed or just copy, using a dedicated
+        * slot to store the tuple in any case.
+        */
+       partinfo = partrelinfo->ri_PartitionInfo;
+       remoteslot_part = partinfo->pi_PartitionTupleSlot;
+       if (remoteslot_part == NULL)
+               remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable);
+       map = partinfo->pi_RootToPartitionMap;
+       if (map != NULL)
+               remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot,
+                                                                                               remoteslot_part);
+       else
+       {
+               remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
+               slot_getallattrs(remoteslot_part);
+       }
+       MemoryContextSwitchTo(oldctx);
+
+       estate->es_result_relation_info = partrelinfo;
+       switch (operation)
+       {
+               case CMD_INSERT:
+                       apply_handle_insert_internal(partrelinfo, estate,
+                                                                                remoteslot_part);
+                       break;
+
+               case CMD_DELETE:
+                       apply_handle_delete_internal(partrelinfo, estate,
+                                                                                remoteslot_part,
+                                                                                &relmapentry->remoterel);
+                       break;
+
+               case CMD_UPDATE:
+
+                       /*
+                        * For UPDATE, depending on whether or not the updated tuple
+                        * satisfies the partition's constraint, perform a simple UPDATE
+                        * of the partition or move the updated tuple into a different
+                        * suitable partition.
+                        */
+                       {
+                               AttrMap    *attrmap = map ? map->attrMap : NULL;
+                               LogicalRepRelMapEntry *part_entry;
+                               TupleTableSlot *localslot;
+                               ResultRelInfo *partrelinfo_new;
+                               bool            found;
+
+                               part_entry = logicalrep_partition_open(relmapentry, partrel,
+                                                                                                          attrmap);
+
+                               /* Get the matching local tuple from the partition. */
+                               found = FindReplTupleInLocalRel(estate, partrel,
+                                                                                               &part_entry->remoterel,
+                                                                                               remoteslot_part, &localslot);
+
+                               oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+                               if (found)
+                               {
+                                       /* Apply the update.  */
+                                       slot_modify_cstrings(remoteslot_part, localslot,
+                                                                                part_entry,
+                                                                                newtup->values, newtup->changed);
+                                       MemoryContextSwitchTo(oldctx);
+                               }
+                               else
+                               {
+                                       /*
+                                        * The tuple to be updated could not be found.
+                                        *
+                                        * TODO what to do here, change the log level to LOG
+                                        * perhaps?
+                                        */
+                                       elog(DEBUG1,
+                                                "logical replication did not find row for update "
+                                                "in replication target relation \"%s\"",
+                                                RelationGetRelationName(partrel));
+                               }
+
+                               /*
+                                * Does the updated tuple still satisfy the current
+                                * partition's constraint?
+                                */
+                               if (partrelinfo->ri_PartitionCheck == NULL ||
+                                       ExecPartitionCheck(partrelinfo, remoteslot_part, estate,
+                                                                          false))
+                               {
+                                       /*
+                                        * Yes, so simply UPDATE the partition.  We don't call
+                                        * apply_handle_update_internal() here, which would
+                                        * normally do the following work, to avoid repeating some
+                                        * work already done above to find the local tuple in the
+                                        * partition.
+                                        */
+                                       EPQState        epqstate;
+
+                                       EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
+                                       ExecOpenIndices(partrelinfo, false);
+
+                                       EvalPlanQualSetSlot(&epqstate, remoteslot_part);
+                                       ExecSimpleRelationUpdate(estate, &epqstate, localslot,
+                                                                                        remoteslot_part);
+                                       ExecCloseIndices(partrelinfo);
+                                       EvalPlanQualEnd(&epqstate);
+                               }
+                               else
+                               {
+                                       /* Move the tuple into the new partition. */
+
+                                       /*
+                                        * New partition will be found using tuple routing, which
+                                        * can only occur via the parent table.  We might need to
+                                        * convert the tuple to the parent's rowtype.  Note that
+                                        * this is the tuple found in the partition, not the
+                                        * original search tuple received by this function.
+                                        */
+                                       if (map)
+                                       {
+                                               TupleConversionMap *PartitionToRootMap =
+                                               convert_tuples_by_name(RelationGetDescr(partrel),
+                                                                                          RelationGetDescr(parentrel));
+
+                                               remoteslot =
+                                                       execute_attr_map_slot(PartitionToRootMap->attrMap,
+                                                                                                 remoteslot_part, remoteslot);
+                                       }
+                                       else
+                                       {
+                                               remoteslot = ExecCopySlot(remoteslot, remoteslot_part);
+                                               slot_getallattrs(remoteslot);
+                                       }
+
+
+                                       /* Find the new partition. */
+                                       oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+                                       partrelinfo_new = ExecFindPartition(mtstate, relinfo,
+                                                                                                               proute, remoteslot,
+                                                                                                               estate);
+                                       MemoryContextSwitchTo(oldctx);
+                                       Assert(partrelinfo_new != partrelinfo);
+
+                                       /* DELETE old tuple found in the old partition. */
+                                       estate->es_result_relation_info = partrelinfo;
+                                       apply_handle_delete_internal(partrelinfo, estate,
+                                                                                                localslot,
+                                                                                                &relmapentry->remoterel);
+
+                                       /* INSERT new tuple into the new partition. */
+
+                                       /*
+                                        * Convert the replacement tuple to match the destination
+                                        * partition rowtype.
+                                        */
+                                       oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+                                       partrel = partrelinfo_new->ri_RelationDesc;
+                                       partinfo = partrelinfo_new->ri_PartitionInfo;
+                                       remoteslot_part = partinfo->pi_PartitionTupleSlot;
+                                       if (remoteslot_part == NULL)
+                                               remoteslot_part = table_slot_create(partrel,
+                                                                                                                       &estate->es_tupleTable);
+                                       map = partinfo->pi_RootToPartitionMap;
+                                       if (map != NULL)
+                                       {
+                                               remoteslot_part = execute_attr_map_slot(map->attrMap,
+                                                                                                                               remoteslot,
+                                                                                                                               remoteslot_part);
+                                       }
+                                       else
+                                       {
+                                               remoteslot_part = ExecCopySlot(remoteslot_part,
+                                                                                                          remoteslot);
+                                               slot_getallattrs(remoteslot);
+                                       }
+                                       MemoryContextSwitchTo(oldctx);
+                                       estate->es_result_relation_info = partrelinfo_new;
+                                       apply_handle_insert_internal(partrelinfo_new, estate,
+                                                                                                remoteslot_part);
+                               }
+                       }
+                       break;
+
+               default:
+                       elog(ERROR, "unrecognized CmdType: %d", (int) operation);
+                       break;
+       }
+
+       ExecCleanupTupleRouting(mtstate, proute);
+}
+
 /*
  * Handle TRUNCATE message.
  *
@@ -988,6 +1238,7 @@ apply_handle_truncate(StringInfo s)
        List       *remote_relids = NIL;
        List       *remote_rels = NIL;
        List       *rels = NIL;
+       List       *part_rels = NIL;
        List       *relids = NIL;
        List       *relids_logged = NIL;
        ListCell   *lc;
@@ -1017,6 +1268,47 @@ apply_handle_truncate(StringInfo s)
                relids = lappend_oid(relids, rel->localreloid);
                if (RelationIsLogicallyLogged(rel->localrel))
                        relids_logged = lappend_oid(relids_logged, rel->localreloid);
+
+               /*
+                * Truncate partitions if we got a message to truncate a partitioned
+                * table.
+                */
+               if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               {
+                       ListCell   *child;
+                       List       *children = find_all_inheritors(rel->localreloid,
+                                                                                                          RowExclusiveLock,
+                                                                                                          NULL);
+
+                       foreach(child, children)
+                       {
+                               Oid                     childrelid = lfirst_oid(child);
+                               Relation        childrel;
+
+                               if (list_member_oid(relids, childrelid))
+                                       continue;
+
+                               /* find_all_inheritors already got lock */
+                               childrel = table_open(childrelid, NoLock);
+
+                               /*
+                                * Ignore temp tables of other backends.  See similar code in
+                                * ExecuteTruncate().
+                                */
+                               if (RELATION_IS_OTHER_TEMP(childrel))
+                               {
+                                       table_close(childrel, RowExclusiveLock);
+                                       continue;
+                               }
+
+                               rels = lappend(rels, childrel);
+                               part_rels = lappend(part_rels, childrel);
+                               relids = lappend_oid(relids, childrelid);
+                               /* Log this relation only if needed for logical decoding */
+                               if (RelationIsLogicallyLogged(childrel))
+                                       relids_logged = lappend_oid(relids_logged, childrelid);
+                       }
+               }
        }
 
        /*
@@ -1032,6 +1324,12 @@ apply_handle_truncate(StringInfo s)
 
                logicalrep_rel_close(rel, NoLock);
        }
+       foreach(lc, part_rels)
+       {
+               Relation        rel = lfirst(lc);
+
+               table_close(rel, NoLock);
+       }
 
        CommandCounterIncrement();
 }
index 9971a8028caa1f38bbe0ddf9760400d03dbae2b5..4650b4f9e1b351ed53501328369e13d9c010e9b7 100644 (file)
@@ -34,6 +34,8 @@ extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
 
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
                                                                                                  LOCKMODE lockmode);
+extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
+                                                 Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
                                                                 LOCKMODE lockmode);
 
index ea5812ce189bd30bfddf18fd63839b75c7fb17ce..5db1b21c594a264c72b9b2a077eaa3e4cdc8fb00 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 15;
+use Test::More tests => 24;
 
 # setup
 
@@ -33,29 +33,49 @@ $node_publisher->safe_psql('postgres',
 $node_publisher->safe_psql('postgres',
        "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
 $node_publisher->safe_psql('postgres',
-       "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)");
+       "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (4, 5, 6)");
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab1_def PARTITION OF tab1 DEFAULT");
 $node_publisher->safe_psql('postgres',
        "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1");
 
 # subscriber1
+#
+# This is partitioned differently from the publisher.  tab1_2 is
+# subpartitioned.  This tests the tuple routing code on the
+# subscriber.
 $node_subscriber1->safe_psql('postgres',
-       "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)");
+       "CREATE TABLE tab1 (c text, a int PRIMARY KEY, b text) PARTITION BY LIST (a)");
 $node_subscriber1->safe_psql('postgres',
        "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)");
+
+$node_subscriber1->safe_psql('postgres',
+       "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)");
 $node_subscriber1->safe_psql('postgres',
-       "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)");
+       "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (4, 5, 6) PARTITION BY LIST (a)");
 $node_subscriber1->safe_psql('postgres',
-       "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)");
+       "CREATE TABLE tab1_2_1 (c text, b text, a int NOT NULL)");
+$node_subscriber1->safe_psql('postgres',
+       "ALTER TABLE tab1_2 ATTACH PARTITION tab1_2_1 FOR VALUES IN (5)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (4, 6)");
+$node_subscriber1->safe_psql('postgres',
+       "CREATE TABLE tab1_def PARTITION OF tab1 (c DEFAULT 'sub1_tab1') DEFAULT");
 $node_subscriber1->safe_psql('postgres',
        "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1");
 
 # subscriber 2
+#
+# This does not use partitioning.  The tables match the leaf tables on
+# the publisher.
 $node_subscriber2->safe_psql('postgres',
        "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)");
 $node_subscriber2->safe_psql('postgres',
        "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)");
 $node_subscriber2->safe_psql('postgres',
        "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)");
+$node_subscriber2->safe_psql('postgres',
+       "CREATE TABLE tab1_def (a int PRIMARY KEY, b text, c text DEFAULT 'sub2_tab1_def')");
 $node_subscriber2->safe_psql('postgres',
        "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all");
 
@@ -74,59 +94,122 @@ $node_publisher->safe_psql('postgres',
        "INSERT INTO tab1_1 (a) VALUES (3)");
 $node_publisher->safe_psql('postgres',
        "INSERT INTO tab1_2 VALUES (5)");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab1 VALUES (0)");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
 
 my $result = $node_subscriber1->safe_psql('postgres',
-       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
-is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated');
+       "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub1_tab1|0
+sub1_tab1|1
+sub1_tab1|3
+sub1_tab1|5), 'inserts into tab1 and its partitions replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT a FROM tab1_2_1 ORDER BY 1");
+is($result, qq(5), 'inserts into tab1_2 replicated into tab1_2_1 correctly');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT a FROM tab1_2_2 ORDER BY 1");
+is($result, qq(), 'inserts into tab1_2 replicated into tab1_2_2 correctly');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_1|1
+sub2_tab1_1|3), 'inserts into tab1_1 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
-is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated');
+       "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_2|5), 'inserts into tab1_2 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
-is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated');
+       "SELECT c, a FROM tab1_def ORDER BY 1, 2");
+is($result, qq(sub2_tab1_def|0), 'inserts into tab1_def replicated');
 
-# update (no partition change)
+# update (replicated as update)
 $node_publisher->safe_psql('postgres',
        "UPDATE tab1 SET a = 2 WHERE a = 1");
+# All of the following cause an update to be applied to a partitioned
+# table on subscriber1: tab1_2 is leaf partition on publisher, whereas
+# it's sub-partitioned on subscriber1.
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 6 WHERE a = 5");
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 4 WHERE a = 6");
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 6 WHERE a = 4");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
 
 $result = $node_subscriber1->safe_psql('postgres',
-       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
-is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated');
+       "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub1_tab1|0
+sub1_tab1|2
+sub1_tab1|3
+sub1_tab1|6), 'update of tab1_1, tab1_2 replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT a FROM tab1_2_1 ORDER BY 1");
+is($result, qq(), 'updates of tab1_2 replicated into tab1_2_1 correctly');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT a FROM tab1_2_2 ORDER BY 1");
+is($result, qq(6), 'updates of tab1_2 replicated into tab1_2_2 correctly');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_1|2
+sub2_tab1_1|3), 'update of tab1_1 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
-is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated');
+       "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_2|6), 'tab1_2 updated');
 
-# update (partition changes)
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab1_def ORDER BY 1");
+is($result, qq(sub2_tab1_def|0), 'tab1_def unchanged');
+
+# update (replicated as delete+insert)
 $node_publisher->safe_psql('postgres',
-       "UPDATE tab1 SET a = 6 WHERE a = 2");
+       "UPDATE tab1 SET a = 1 WHERE a = 0");
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab1 SET a = 4 WHERE a = 1");
 
 $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
 
 $result = $node_subscriber1->safe_psql('postgres',
-       "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1");
-is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated');
+       "SELECT c, a FROM tab1 ORDER BY 1, 2");
+is($result, qq(sub1_tab1|2
+sub1_tab1|3
+sub1_tab1|4
+sub1_tab1|6), 'update of tab1 (delete from tab1_def + insert into tab1_1) replicated');
+
+$result = $node_subscriber1->safe_psql('postgres',
+       "SELECT a FROM tab1_2_2 ORDER BY 1");
+is($result, qq(4
+6), 'updates of tab1 (delete + insert) replicated into tab1_2_2 correctly');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT c, a FROM tab1_1 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_1|2
+sub2_tab1_1|3), 'tab1_1 unchanged');
 
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1");
-is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated');
+       "SELECT c, a FROM tab1_2 ORDER BY 1, 2");
+is($result, qq(sub2_tab1_2|4
+sub2_tab1_2|6), 'insert into tab1_2 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1");
-is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated');
+       "SELECT a FROM tab1_def ORDER BY 1");
+is($result, qq(), 'delete from tab1_def replicated');
 
 # delete
 $node_publisher->safe_psql('postgres',
-       "DELETE FROM tab1 WHERE a IN (3, 5)");
+       "DELETE FROM tab1 WHERE a IN (2, 3, 5)");
 $node_publisher->safe_psql('postgres',
        "DELETE FROM tab1_2");
 
@@ -134,22 +217,22 @@ $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
 
 $result = $node_subscriber1->safe_psql('postgres',
-       "SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated');
+       "SELECT a FROM tab1");
+is($result, qq(), 'delete from tab1_1, tab1_2 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT count(*), min(a), max(a) FROM tab1_1");
-is($result, qq(0||), 'delete from tab1_1 replicated');
+       "SELECT a FROM tab1_1");
+is($result, qq(), 'delete from tab1_1 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT count(*), min(a), max(a) FROM tab1_2");
-is($result, qq(0||), 'delete from tab1_2 replicated');
+       "SELECT a FROM tab1_2");
+is($result, qq(), 'delete from tab1_2 replicated');
 
 # truncate
 $node_subscriber1->safe_psql('postgres',
-       "INSERT INTO tab1 VALUES (1), (2), (5)");
+       "INSERT INTO tab1 (a) VALUES (1), (2), (5)");
 $node_subscriber2->safe_psql('postgres',
-       "INSERT INTO tab1_2 VALUES (2)");
+       "INSERT INTO tab1_2 (a) VALUES (2)");
 $node_publisher->safe_psql('postgres',
        "TRUNCATE tab1_2");
 
@@ -157,12 +240,13 @@ $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
 
 $result = $node_subscriber1->safe_psql('postgres',
-       "SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(2|1|2), 'truncate of tab1_2 replicated');
+       "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(1
+2), 'truncate of tab1_2 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT count(*), min(a), max(a) FROM tab1_2");
-is($result, qq(0||), 'truncate of tab1_2 replicated');
+       "SELECT a FROM tab1_2 ORDER BY 1");
+is($result, qq(), 'truncate of tab1_2 replicated');
 
 $node_publisher->safe_psql('postgres',
        "TRUNCATE tab1");
@@ -171,8 +255,8 @@ $node_publisher->wait_for_catchup('sub1');
 $node_publisher->wait_for_catchup('sub2');
 
 $result = $node_subscriber1->safe_psql('postgres',
-       "SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(0||), 'truncate of tab1_1 replicated');
+       "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(), 'truncate of tab1_1 replicated');
 $result = $node_subscriber2->safe_psql('postgres',
-       "SELECT count(*), min(a), max(a) FROM tab1");
-is($result, qq(0||), 'truncate of tab1_1 replicated');
+       "SELECT a FROM tab1 ORDER BY 1");
+is($result, qq(), 'truncate of tab1 replicated');