static void
logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
{
- LogicalRepRelMapEntry *entry;
+ LogicalRepPartMapEntry *entry;
/* Just to be sure. */
if (LogicalRepPartMap == NULL)
hash_seq_init(&status, LogicalRepPartMap);
/* TODO, use inverse lookup hashtable? */
- while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
{
- if (entry->localreloid == reloid)
+ if (entry->relmapentry.localreloid == reloid)
{
- entry->localrelvalid = false;
+ entry->relmapentry.localrelvalid = false;
hash_seq_term(&status);
break;
}
hash_seq_init(&status, LogicalRepPartMap);
- while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
- entry->localrelvalid = false;
+ while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
+ entry->relmapentry.localrelvalid = false;
}
}
Oid partOid = RelationGetRelid(partrel);
AttrMap *attrmap = root->attrmap;
bool found;
- int i;
MemoryContext oldctx;
if (LogicalRepPartMap == NULL)
(void *) &partOid,
HASH_ENTER, &found);
- if (found)
- return &part_entry->relmapentry;
+ entry = &part_entry->relmapentry;
- memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+ if (found && entry->localrelvalid)
+ return entry;
/* Switch to longer-lived context. */
oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
- part_entry->partoid = partOid;
+ if (!found)
+ {
+ memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
+ part_entry->partoid = partOid;
+ }
- /* Remote relation is copied as-is from the root entry. */
- entry = &part_entry->relmapentry;
- entry->remoterel.remoteid = remoterel->remoteid;
- entry->remoterel.nspname = pstrdup(remoterel->nspname);
- entry->remoterel.relname = pstrdup(remoterel->relname);
- entry->remoterel.natts = remoterel->natts;
- entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
- entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
- for (i = 0; i < remoterel->natts; i++)
+ if (!entry->remoterel.remoteid)
{
- entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
- entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+ int i;
+
+ /* Remote relation is copied as-is from the root entry. */
+ entry = &part_entry->relmapentry;
+ entry->remoterel.remoteid = remoterel->remoteid;
+ entry->remoterel.nspname = pstrdup(remoterel->nspname);
+ entry->remoterel.relname = pstrdup(remoterel->relname);
+ entry->remoterel.natts = remoterel->natts;
+ entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
+ entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
+ for (i = 0; i < remoterel->natts; i++)
+ {
+ entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
+ entry->remoterel.atttyps[i] = remoterel->atttyps[i];
+ }
+ entry->remoterel.replident = remoterel->replident;
+ entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
}
- entry->remoterel.replident = remoterel->replident;
- entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
entry->localrel = partrel;
entry->localreloid = partOid;
{
AttrNumber root_attno = map->attnums[attno];
- entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
+ /* 0 means it's a dropped attribute. See comments atop AttrMap. */
+ if (root_attno == 0)
+ entry->attrmap->attnums[attno] = -1;
+ else
+ entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
}
}
else
qr/logical replication did not find row to be deleted in replication target relation "tab2_1"/,
'delete target row is missing in tab2_1');
-# No need for this until more tests are added.
-# $node_subscriber1->append_conf('postgresql.conf',
-# "log_min_messages = warning");
-# $node_subscriber1->reload;
+$node_subscriber1->append_conf('postgresql.conf',
+ "log_min_messages = warning");
+$node_subscriber1->reload;
+
+# Test that replication continues to work correctly after altering the
+# partition of a partitioned target table.
+
+$node_publisher->safe_psql(
+ 'postgres', q{
+ CREATE TABLE tab5 (a int NOT NULL, b int);
+ CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+ ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;});
+
+$node_subscriber2->safe_psql(
+ 'postgres', q{
+ CREATE TABLE tab5 (a int NOT NULL, b int, c int) PARTITION BY LIST (a);
+ CREATE TABLE tab5_1 PARTITION OF tab5 DEFAULT;
+ CREATE UNIQUE INDEX tab5_a_idx ON tab5 (a);
+ ALTER TABLE tab5 REPLICA IDENTITY USING INDEX tab5_a_idx;
+ ALTER TABLE tab5_1 REPLICA IDENTITY USING INDEX tab5_1_a_idx;});
+
+$node_subscriber2->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub2 REFRESH PUBLICATION");
+
+$node_subscriber2->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+# Make partition map cache
+$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 1)");
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 2 WHERE a = 1");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b FROM tab5 ORDER BY 1");
+is($result, qq(2|1), 'updates of tab5 replicated correctly');
+
+# Change the column order of partition on subscriber
+$node_subscriber2->safe_psql(
+ 'postgres', q{
+ ALTER TABLE tab5 DETACH PARTITION tab5_1;
+ ALTER TABLE tab5_1 DROP COLUMN b;
+ ALTER TABLE tab5_1 ADD COLUMN b int;
+ ALTER TABLE tab5 ATTACH PARTITION tab5_1 DEFAULT});
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+ "SELECT a, b, c FROM tab5 ORDER BY 1");
+is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');
done_testing();