Fix data inconsistency between publisher and subscriber.
authorAmit Kapila <akapila@postgresql.org>
Thu, 16 Jun 2022 03:02:10 +0000 (08:32 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 16 Jun 2022 03:02:10 +0000 (08:32 +0530)
We were not updating the partition map cache in the subscriber even when
the corresponding remote rel is changed. Due to this data was getting
incorrectly replicated for partition tables after the publisher has
changed the table schema.

Fix it by resetting the required entries in the partition map cache after
receiving a new relation mapping from the publisher.

Reported-by: Shi Yu
Author: Shi Yu, Hou Zhijie
Reviewed-by: Amit Langote, Amit Kapila
Backpatch-through: 13, where it was introduced
Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com

src/backend/replication/logical/relation.c
src/backend/replication/logical/worker.c
src/include/replication/logicalrelation.h
src/test/subscription/t/013_partition.pl

index 5f4689f18226fc90278fd73b8fc9ea9eb3ba9c46..5c7e9d11ac8f2a18c671ba9dd3742aeb5856e96b 100644 (file)
@@ -486,6 +486,40 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
    }
 }
 
+/*
+ * Reset the entries in the partition map that refer to remoterel.
+ *
+ * Called when new relation mapping is sent by the publisher to update our
+ * expected view of incoming data from said publisher.
+ *
+ * Note that we don't update the remoterel information in the entry here,
+ * we will update the information in logicalrep_partition_open to avoid
+ * unnecessary work.
+ */
+void
+logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
+{
+   HASH_SEQ_STATUS status;
+   LogicalRepPartMapEntry *part_entry;
+   LogicalRepRelMapEntry *entry;
+
+   if (LogicalRepPartMap == NULL)
+       return;
+
+   hash_seq_init(&status, LogicalRepPartMap);
+   while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
+   {
+       entry = &part_entry->relmapentry;
+
+       if (entry->remoterel.remoteid != remoterel->remoteid)
+           continue;
+
+       logicalrep_relmap_free_entry(entry);
+
+       memset(entry, 0, sizeof(LogicalRepRelMapEntry));
+   }
+}
+
 /*
  * Initialize the partition map cache.
  */
index 833b2809d0bb5409be5d7c37a0d725b93551308d..bf97fa44ba2814bfba1955f48d038d889608dfca 100644 (file)
@@ -1191,6 +1191,9 @@ apply_handle_relation(StringInfo s)
 
    rel = logicalrep_read_rel(s);
    logicalrep_relmap_update(rel);
+
+   /* Also reset all entries in the partition map that refer to remoterel. */
+   logicalrep_partmap_reset_relmap(rel);
 }
 
 /*
index 3c662d3abcf811ae0488ea6c0da1f7819b165249..10f91490b5c8d6f8ea6ad690975519a3f810f282 100644 (file)
@@ -38,6 +38,7 @@ typedef struct LogicalRepRelMapEntry
 } LogicalRepRelMapEntry;
 
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
+extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
 
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
                                                  LOCKMODE lockmode);
index e53bc5b568f81606d2f6a28fc30faa9485760648..568e4d104e06e3718346c004fc00d5510310e8ae 100644 (file)
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 69;
+use Test::More tests => 70;
 
 # setup
 
@@ -841,3 +841,18 @@ $node_publisher->wait_for_catchup('sub2');
 $result = $node_subscriber2->safe_psql('postgres',
    "SELECT a, b, c FROM tab5 ORDER BY 1");
 is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');
+
+# Test that replication into the partitioned target table continues to
+# work correctly when the published table is altered.
+$node_publisher->safe_psql(
+   'postgres', q{
+   ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT;
+   ALTER TABLE tab5 ADD COLUMN b INT;});
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+   "SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');