Have logical replication subscriber fire column triggers
authorPeter Eisentraut <peter@eisentraut.org>
Mon, 6 Jan 2020 07:21:14 +0000 (08:21 +0100)
committerPeter Eisentraut <peter@eisentraut.org>
Mon, 6 Jan 2020 07:40:00 +0000 (08:40 +0100)
The logical replication apply worker did not fire per-column update
triggers because the updatedCols bitmap in the RTE was not populated.
This fixes that.

Reviewed-by: Euler Taveira <euler@timbira.com.br>
Discussion: https://www.postgresql.org/message-id/flat/21673e2d-597c-6afe-637e-e8b10425b240%402ndquadrant.com

src/backend/replication/logical/worker.c
src/test/subscription/t/003_constraints.pl

index 70db25ef16fd9cf1cd33cf37a977485dc0bf2e49..7a5471f95cb5448e3da0f22603c86dcdeccfb45f 100644 (file)
@@ -691,6 +691,7 @@ apply_handle_update(StringInfo s)
        bool            has_oldtup;
        TupleTableSlot *localslot;
        TupleTableSlot *remoteslot;
+       RangeTblEntry *target_rte;
        bool            found;
        MemoryContext oldctx;
 
@@ -721,6 +722,21 @@ apply_handle_update(StringInfo s)
                                                                  &estate->es_tupleTable);
        EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 
+       /*
+        * Populate updatedCols so that per-column triggers can fire.  This could
+        * include more columns than were actually changed on the publisher
+        * because the logical replication protocol doesn't contain that
+        * information.  But it would for example exclude columns that only exist
+        * on the subscriber, since we are not touching those.
+        */
+       target_rte = list_nth(estate->es_range_table, 0);
+       for (int i = 0; i < remoteslot->tts_tupleDescriptor->natts; i++)
+       {
+               if (newtup.changed[i])
+                       target_rte->updatedCols = bms_add_member(target_rte->updatedCols,
+                                                                                                        i + 1 - FirstLowInvalidHeapAttributeNumber);
+       }
+
        PushActiveSnapshot(GetTransactionSnapshot());
        ExecOpenIndices(estate->es_result_relation_info, false);
 
index 81547f65fa8504f533e71b3ff3373ca223d87e54..34ab11e7fed1fe4380901e44912a13e8f73d19b3 100644 (file)
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 4;
+use Test::More tests => 6;
 
 # Initialize publisher node
 my $node_publisher = get_new_node('publisher');
@@ -81,6 +81,8 @@ BEGIN
         ELSE
             RETURN NULL;
         END IF;
+    ELSIF (TG_OP = 'UPDATE') THEN
+        RETURN NULL;
     ELSE
         RAISE WARNING 'Unknown action';
         RETURN NULL;
@@ -88,7 +90,7 @@ BEGIN
 END;
 \$\$ LANGUAGE plpgsql;
 CREATE TRIGGER filter_basic_dml_trg
-    BEFORE INSERT ON tab_fk_ref
+    BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref
     FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
 ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;
 });
@@ -99,10 +101,32 @@ $node_publisher->safe_psql('postgres',
 
 $node_publisher->wait_for_catchup('tap_sub');
 
-# The row should be skipped on subscriber
+# The trigger should cause the insert to be skipped on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
+is($result, qq(2|1|2), 'check replica insert trigger applied on subscriber');
+
+# Update data
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+# The trigger should cause the update to be skipped on subscriber
 $result = $node_subscriber->safe_psql('postgres',
        "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;");
-is($result, qq(2|1|2), 'check replica trigger applied on subscriber');
+is($result, qq(2|1|2), 'check replica update column trigger applied on subscriber');
+
+# Update on a column not specified in the trigger, but it will trigger
+# anyway because logical replication ships all columns in an update.
+$node_publisher->safe_psql('postgres',
+       "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*), min(id), max(id) FROM tab_fk_ref;");
+is($result, qq(2|1|2), 'check column trigger applied on even for other column');
 
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');