Handle heap rewrites even better in logical decoding
authorPeter Eisentraut <peter_e@gmx.net>
Wed, 21 Mar 2018 13:13:24 +0000 (09:13 -0400)
committerPeter Eisentraut <peter_e@gmx.net>
Wed, 21 Mar 2018 13:15:04 +0000 (09:15 -0400)
Logical decoding should not publish anything about tables created as
part of a heap rewrite during DDL.  Those tables don't exist externally,
so consumers of logical decoding cannot do anything sensible with that
information.  In ab28feae2bd3d4629bd73ae3548e671c57d785f0, we worked
around this for built-in logical replication, but that was hack.

This is a more proper fix: We mark such transient heaps using the new
field pg_class.relwrite, linking to the original relation OID.  By
default, we ignore them in logical decoding before they get to the
output plugin.  Optionally, a plugin can register their interest in
getting such changes, if they handle DDL specially, in which case the
new field will help them get information about the actual table.

Reviewed-by: Craig Ringer <craig@2ndquadrant.com>
20 files changed:
contrib/test_decoding/expected/concurrent_ddl_dml.out
contrib/test_decoding/expected/ddl.out
contrib/test_decoding/specs/concurrent_ddl_dml.spec
contrib/test_decoding/sql/ddl.sql
contrib/test_decoding/test_decoding.c
doc/src/sgml/catalogs.sgml
doc/src/sgml/logicaldecoding.sgml
src/backend/bootstrap/bootparse.y
src/backend/catalog/heap.c
src/backend/catalog/toasting.c
src/backend/commands/cluster.c
src/backend/commands/tablecmds.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/pgoutput/pgoutput.c
src/include/catalog/catversion.h
src/include/catalog/heap.h
src/include/catalog/pg_class.h
src/include/replication/output_plugin.h
src/include/replication/reorderbuffer.h

index a15bfa292ef76339bf6e77a9f1c8a1381093814a..1f9e7661b753967c54eb7f4f99adbdde90232610 100644 (file)
@@ -10,7 +10,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl2_float: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE float;
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -32,16 +32,13 @@ step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; <waitin
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s1_commit: COMMIT;
 step s2_alter_tbl1_float: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1
 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1
 COMMIT         
-BEGIN          
-table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1
-COMMIT         
 ?column?       
 
 stop           
@@ -56,7 +53,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl2_char: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE character varying;
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -78,16 +75,13 @@ step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varyi
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s1_commit: COMMIT;
 step s2_alter_tbl1_char: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1
 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1
 COMMIT         
-BEGIN          
-table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1'
-COMMIT         
 ?column?       
 
 stop           
@@ -103,16 +97,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; <waiting ...>
 step s1_commit: COMMIT;
 step s2_alter_tbl1_float: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1
 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1
 COMMIT         
-BEGIN          
-table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1
-COMMIT         
 ?column?       
 
 stop           
@@ -128,16 +119,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; <waiting ...>
 step s1_commit: COMMIT;
 step s2_alter_tbl1_char: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1
 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1
 COMMIT         
-BEGIN          
-table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1'
-COMMIT         
 ?column?       
 
 stop           
@@ -154,16 +142,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl1_float: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE float; <waiting ...>
 step s1_commit: COMMIT;
 step s2_alter_tbl1_float: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1
 table public.tbl2: INSERT: val1[integer]:1 val2[double precision]:1
 COMMIT         
-BEGIN          
-table public.pg_temp: INSERT: val1[integer]:1 val2[double precision]:1
-COMMIT         
 ?column?       
 
 stop           
@@ -180,16 +165,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; <waiting ...>
 step s1_commit: COMMIT;
 step s2_alter_tbl1_char: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1
 table public.tbl2: INSERT: val1[integer]:1 val2[character varying]:'1'
 COMMIT         
-BEGIN          
-table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1'
-COMMIT         
 ?column?       
 
 stop           
@@ -205,7 +187,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl2_text: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE text;
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -229,16 +211,13 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl1_char: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE character varying; <waiting ...>
 step s1_commit: COMMIT;
 step s2_alter_tbl1_char: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.tbl1: INSERT: val1[integer]:1 val2[integer]:1
 table public.tbl2: INSERT: val1[integer]:1 val2[text]:'1'
 COMMIT         
-BEGIN          
-table public.pg_temp: INSERT: val1[integer]:1 val2[character varying]:'1'
-COMMIT         
 ?column?       
 
 stop           
@@ -254,7 +233,7 @@ step s2_alter_tbl2_boolean: ALTER TABLE tbl2 ALTER COLUMN val2 TYPE boolean;
 ERROR:  column "val2" cannot be cast automatically to type boolean
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -279,7 +258,7 @@ step s2_alter_tbl1_boolean: ALTER TABLE tbl1 ALTER COLUMN val2 TYPE boolean; <wa
 step s1_commit: COMMIT;
 step s2_alter_tbl1_boolean: <... completed>
 error in steps s1_commit s2_alter_tbl1_boolean: ERROR:  column "val2" cannot be cast automatically to type boolean
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -300,7 +279,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER;
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -324,7 +303,7 @@ step s1_begin: BEGIN;
 step s2_alter_tbl2_add_int: ALTER TABLE tbl2 ADD COLUMN val3 INTEGER;
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -348,7 +327,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl2_add_float: ALTER TABLE tbl2 ADD COLUMN val3 FLOAT;
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -372,7 +351,7 @@ step s1_begin: BEGIN;
 step s2_alter_tbl2_add_float: ALTER TABLE tbl2 ADD COLUMN val3 FLOAT;
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -396,7 +375,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying;
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -420,7 +399,7 @@ step s1_begin: BEGIN;
 step s2_alter_tbl2_add_char: ALTER TABLE tbl2 ADD COLUMN val3 character varying;
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -445,7 +424,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; <waiting ...>
 step s1_commit: COMMIT;
 step s2_alter_tbl2_drop_3rd_col: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -468,7 +447,7 @@ step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
 step s1_commit: COMMIT;
 step s2_alter_tbl2_drop_3rd_col: <... completed>
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -493,7 +472,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3; <waiting ...>
 step s1_commit: COMMIT;
 step s2_alter_tbl2_drop_3rd_col: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -506,7 +485,7 @@ step s2_alter_tbl2_3rd_char: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE character v
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
 step s2_alter_tbl2_3rd_char: <... completed>
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -515,14 +494,9 @@ table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[text]:'1'
 COMMIT         
 step s2_alter_tbl2_3rd_int: ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer;
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
-BEGIN          
-table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:null
-table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1
-table public.pg_temp: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1
-COMMIT         
 BEGIN          
 table public.tbl2: INSERT: val1[integer]:1 val2[integer]:1 val3[integer]:1
 COMMIT         
@@ -544,7 +518,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
 step s2_alter_tbl2_3rd_text: <... completed>
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -573,7 +547,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
 step s2_alter_tbl2_3rd_char: <... completed>
 step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -601,7 +575,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
 step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3;
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -628,7 +602,7 @@ step s1_insert_tbl2_3col: INSERT INTO tbl2 (val1, val2, val3) VALUES (1, 1, 1);
 step s1_commit: COMMIT;
 step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3;
 step s1_insert_tbl2: INSERT INTO tbl2 (val1, val2) VALUES (1, 1);
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
@@ -653,7 +627,7 @@ step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
 step s2_alter_tbl2_drop_3rd_col: ALTER TABLE tbl2 DROP COLUMN val3;
 step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
 step s1_commit: COMMIT;
-step s2_get_changes: SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+step s2_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
index 1e22c1eefc16a1b666626178261cadbe4073cb43..b7c76469fc37ea5643c5f49ca5ab0ec4c372d6f9 100644 (file)
@@ -117,11 +117,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 (22 rows)
 
 ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
--- throw away changes, they contain oids
+-- check that this doesn't produce any changes from the heap rewrite
 SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
  count 
 -------
-    12
+     0
 (1 row)
 
 INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
@@ -192,16 +192,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (33 rows)
 
--- hide changes bc of oid visible in full table rewrites
 CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
 INSERT INTO tr_unique(data) VALUES(10);
 ALTER TABLE tr_unique RENAME TO tr_pkey;
 ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
-SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
- count 
--------
-     6
-(1 row)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
+                                    data                                     
+-----------------------------------------------------------------------------
+ BEGIN
+ table public.tr_unique: INSERT: id2[integer]:1 data[integer]:10
+ COMMIT
+ BEGIN
+ table public.tr_pkey: INSERT: id2[integer]:1 data[integer]:10 id[integer]:1
+ COMMIT
+(6 rows)
 
 INSERT INTO tr_pkey(data) VALUES(1);
 --show deletion with primary key
index 4a76532402a5c4a9065ac9a8650fbd1705e8de7a..e7cea37d307fbbb3e033b481d074aa9995e5924e 100644 (file)
@@ -53,7 +53,7 @@ step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE characte
 step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; }
 step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; }
 
-step "s2_get_changes" { SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
 
 
 
index 057dae056be759ebf2b94caadfae73d47c4e0397..c4b10a4cf9e5ff00f2e90ed91003514732419b20 100644 (file)
@@ -67,7 +67,7 @@ INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
--- throw away changes, they contain oids
+-- check that this doesn't produce any changes from the heap rewrite
 SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
@@ -93,12 +93,11 @@ COMMIT;
 /* display results */
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
--- hide changes bc of oid visible in full table rewrites
 CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
 INSERT INTO tr_unique(data) VALUES(10);
 ALTER TABLE tr_unique RENAME TO tr_pkey;
 ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
-SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
 
 INSERT INTO tr_pkey(data) VALUES(1);
 --show deletion with primary key
index f0b37f67fc6f7849f488791ec90e15abd60ec1ca..a94aeeae292287d54dce3784f216f624146f739e 100644 (file)
@@ -101,6 +101,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        ctx->output_plugin_private = data;
 
        opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
+       opt->receive_rewrites = false;
 
        foreach(option, ctx->output_plugin_options)
        {
@@ -166,6 +167,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                                                 errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                                                strVal(elem->arg), elem->defname)));
                }
+               else if (strcmp(elem->defname, "include-rewrites") == 0)
+               {
+
+                       if (elem->arg == NULL)
+                               continue;
+                       else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("could not parse value \"%s\" for parameter \"%s\"",
+                                                               strVal(elem->arg), elem->defname)));
+               }
                else
                {
                        ereport(ERROR,
@@ -412,6 +424,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                                   quote_qualified_identifier(
                                                                                                          get_namespace_name(
                                                                                                                                                 get_rel_namespace(RelationGetRelid(relation))),
+                                                                                                         class_form->relrewrite ?
+                                                                                                         get_rel_name(class_form->relrewrite) :
                                                                                                          NameStr(class_form->relname)));
        appendStringInfoChar(ctx->out, ':');
 
index 30e674130504e7846a66bc508ff55f02fe279d12..fc81133f07dbf388445d30112c8c8f6c9546b4cc 100644 (file)
@@ -1923,6 +1923,18 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry>True if table is a partition</entry>
      </row>
 
+     <row>
+      <entry><structfield>relrewrite</structfield></entry>
+      <entry><type>oid</type></entry>
+      <entry><literal><link linkend="catalog-pg-class"><structname>pg_class</structname></link>.oid</literal></entry>
+      <entry>
+       For new relations being written during a DDL operation that requires a
+       table rewrite, this contains the OID of the original relation;
+       otherwise 0.  That state is only visible internally; this field should
+       never contain anything other than 0 for a user-visible relation.
+      </entry>
+     </row>
+
      <row>
       <entry><structfield>relfrozenxid</structfield></entry>
       <entry><type>xid</type></entry>
index 5501eed108602a7d9d18e8df4075c4a895298781..f6b14dccb094206ec04e935637efeb4d18edd919 100644 (file)
@@ -486,12 +486,17 @@ typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
 typedef struct OutputPluginOptions
 {
     OutputPluginOutputType output_type;
+    bool        receive_rewrites;
 } OutputPluginOptions;
 </programlisting>
       <literal>output_type</literal> has to either be set to
       <literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
       or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
       <xref linkend="logicaldecoding-output-mode"/>.
+      If <literal>receive_rewrites</literal> is true, the output plugin will
+      also be called for changes made by heap rewrites during certain DDL
+      operations.  These are of interest to plugins that handle DDL
+      replication, but they require special handling.
      </para>
 
      <para>
index ed7a55596f8a689251acf1c0d807465f7873f6eb..4ea3aa97cf73cb4d780d9ab0ef65e48770432703 100644 (file)
@@ -257,6 +257,7 @@ Boot_CreateStmt:
                                                                                                          false,
                                                                                                          true,
                                                                                                          false,
+                                                                                                         InvalidOid,
                                                                                                          NULL);
                                                elog(DEBUG4, "relation created with OID %u", id);
                                        }
index 0497332e9d7fad01d4f676ed7d5a2184226bed0d..ca2c2f9952066f228a6981b2669e8c66f1722555 100644 (file)
@@ -806,6 +806,7 @@ InsertPgClassTuple(Relation pg_class_desc,
        values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
        values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
        values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
+       values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
        values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
        values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
        if (relacl != (Datum) 0)
@@ -1038,6 +1039,7 @@ heap_create_with_catalog(const char *relname,
                                                 bool use_user_acl,
                                                 bool allow_system_table_mods,
                                                 bool is_internal,
+                                                Oid relrewrite,
                                                 ObjectAddress *typaddress)
 {
        Relation        pg_class_desc;
@@ -1176,6 +1178,8 @@ heap_create_with_catalog(const char *relname,
 
        Assert(relid == RelationGetRelid(new_rel_desc));
 
+       new_rel_desc->rd_rel->relrewrite = relrewrite;
+
        /*
         * Decide whether to create an array type over the relation's rowtype. We
         * do not create any array types for system catalogs (ie, those made
index 8bf269854501972d55c6ac9d2d844061f87691cd..c4515e6c1d188ad7539afbfb4f906161828a7efa 100644 (file)
@@ -279,6 +279,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
                                                                                   false,
                                                                                   true,
                                                                                   true,
+                                                                                  InvalidOid,
                                                                                   NULL);
        Assert(toast_relid != InvalidOid);
 
index 96a51bb7603cb546e798a6dabbba4734b7aa5908..57f3917fdc4de36a9ea455907af864be56f73438 100644 (file)
@@ -692,6 +692,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char relpersistence,
                                                                                  false,
                                                                                  true,
                                                                                  true,
+                                                                                 OIDOldHeap,
                                                                                  NULL);
        Assert(OIDNewHeap != InvalidOid);
 
index 8f83aa46753841df2f76cbbbac8482e49cfbc4cb..2ec99f99f935d9e77133e4e878c33636836a6aea 100644 (file)
@@ -764,6 +764,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
                                                                                  true,
                                                                                  allowSystemTableMods,
                                                                                  false,
+                                                                                 InvalidOid,
                                                                                  typaddress);
 
        /* Store inheritance information for new rel. */
index 7637efc32e055f45eb3ae0c6534797163bc8b9b2..e2e39f4577904d4ddcf2d0e3eff91769290d2020 100644 (file)
@@ -317,6 +317,8 @@ CreateInitDecodingContext(char *plugin,
                startup_cb_wrapper(ctx, &ctx->options, true);
        MemoryContextSwitchTo(old_context);
 
+       ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
+
        return ctx;
 }
 
@@ -410,6 +412,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                startup_cb_wrapper(ctx, &ctx->options, false);
        MemoryContextSwitchTo(old_context);
 
+       ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
+
        ereport(LOG,
                        (errmsg("starting logical decoding for slot \"%s\"",
                                        NameStr(slot->data.name)),
index 7612cf5f04c57f38addae50ac3220f0154d199c6..5ffe638b19ce844afb24eca0b4bf156a237c3c45 100644 (file)
@@ -1402,6 +1402,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                                        if (!RelationIsLogicallyLogged(relation))
                                                goto change_done;
 
+                                       /*
+                                        * Ignore temporary heaps created during DDL unless the
+                                        * plugin has asked for them.
+                                        */
+                                       if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+                                               goto change_done;
+
                                        /*
                                         * For now ignore sequence changes entirely. Most of the
                                         * time they don't log changes using records we
index d538f25ede6c038be69dcd98f0d4a8409fd10a54..aa9cf5b54ed28af559132b79abbce950916ef3d8 100644 (file)
@@ -21,7 +21,6 @@
 
 #include "utils/inval.h"
 #include "utils/int8.h"
-#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
@@ -511,31 +510,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
                {
                        Publication *pub = lfirst(lc);
 
-                       /*
-                        * Skip tables that look like they are from a heap rewrite (see
-                        * make_new_heap()).  We need to skip them because the subscriber
-                        * won't have a table by that name to receive the data.  That
-                        * means we won't ship the new data in, say, an added column with
-                        * a DEFAULT, but if the user applies the same DDL manually on the
-                        * subscriber, then this will work out for them.
-                        *
-                        * We only need to consider the alltables case, because such a
-                        * transient heap won't be an explicit member of a publication.
-                        */
-                       if (pub->alltables)
-                       {
-                               char       *relname = get_rel_name(relid);
-                               unsigned int u;
-                               int                     n;
-
-                               if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 &&
-                                       relname[n] == '\0')
-                               {
-                                       if (get_rel_relkind(u) == RELKIND_RELATION)
-                                               break;
-                               }
-                       }
-
                        if (pub->alltables || list_member_oid(pubids, pub->oid))
                        {
                                entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
index 96d3406abe09be62a9225a4c8ef3db260dea0e22..3a3593be8dc561cf1c0833837751eefd98bfedeb 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     201803141
+#define CATALOG_VERSION_NO     201803211
 
 #endif
index 9bdc63ceb5ac782b2fa43e63611b5e6305323da0..3308fa3dfd5a1dd3aad4af5add99da1f85a1086d 100644 (file)
@@ -71,6 +71,7 @@ extern Oid heap_create_with_catalog(const char *relname,
                                                 bool use_user_acl,
                                                 bool allow_system_table_mods,
                                                 bool is_internal,
+                                                Oid relrewrite,
                                                 ObjectAddress *typaddress);
 
 extern void heap_create_init_fork(Relation rel);
index 7fc355acb893813ce16c01ab401bd054857f1571..85cdb99f1f642b002d89c7a61bda73252e3f13cf 100644 (file)
@@ -70,6 +70,7 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
        bool            relispopulated; /* matview currently holds query results */
        char            relreplident;   /* see REPLICA_IDENTITY_xxx constants  */
        bool            relispartition; /* is relation a partition? */
+       Oid                     relrewrite;             /* heap for rewrite during DDL, link to original rel */
        TransactionId relfrozenxid; /* all Xids < this are frozen in this rel */
        TransactionId relminmxid;       /* all multixacts in this rel are >= this.
                                                                 * this is really a MultiXactId */
@@ -98,7 +99,7 @@ typedef FormData_pg_class *Form_pg_class;
  * ----------------
  */
 
-#define Natts_pg_class                                         32
+#define Natts_pg_class                                         33
 #define Anum_pg_class_relname                          1
 #define Anum_pg_class_relnamespace                     2
 #define Anum_pg_class_reltype                          3
@@ -126,11 +127,12 @@ typedef FormData_pg_class *Form_pg_class;
 #define Anum_pg_class_relispopulated           25
 #define Anum_pg_class_relreplident                     26
 #define Anum_pg_class_relispartition           27
-#define Anum_pg_class_relfrozenxid                     28
-#define Anum_pg_class_relminmxid                       29
-#define Anum_pg_class_relacl                           30
-#define Anum_pg_class_reloptions                       31
-#define Anum_pg_class_relpartbound                     32
+#define Anum_pg_class_relrewrite                       28
+#define Anum_pg_class_relfrozenxid                     29
+#define Anum_pg_class_relminmxid                       30
+#define Anum_pg_class_relacl                           31
+#define Anum_pg_class_reloptions                       32
+#define Anum_pg_class_relpartbound                     33
 
 /* ----------------
  *             initial contents of pg_class
@@ -145,13 +147,13 @@ typedef FormData_pg_class *Form_pg_class;
  * Note: "3" in the relfrozenxid column stands for FirstNormalTransactionId;
  * similarly, "1" in relminmxid stands for FirstMultiXactId
  */
-DATA(insert OID = 1247 (  pg_type              PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f t n f 3 1 _null_ _null_ _null_));
+DATA(insert OID = 1247 (  pg_type              PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f t n f 3 1 _null_ _null_ _null_));
 DESCR("");
-DATA(insert OID = 1249 (  pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 22 0 f f f f f f t n f 3 1 _null_ _null_ _null_));
+DATA(insert OID = 1249 (  pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 22 0 f f f f f f t n f 3 1 _null_ _null_ _null_));
 DESCR("");
-DATA(insert OID = 1255 (  pg_proc              PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f f t n f 3 1 _null_ _null_ _null_));
+DATA(insert OID = 1255 (  pg_proc              PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f f t n f 3 1 _null_ _null_ _null_));
 DESCR("");
-DATA(insert OID = 1259 (  pg_class             PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 32 0 t f f f f f t n f 3 1 _null_ _null_ _null_));
+DATA(insert OID = 1259 (  pg_class             PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 33 0 t f f f f f t n f 0 3 1 _null_ _null_ _null_));
 DESCR("");
 
 
index 78fd38bb1694af76b2729881c4deb27c75d63849..82875d6b3d5045a832afe76795851c058fa404c8 100644 (file)
@@ -26,6 +26,7 @@ typedef enum OutputPluginOutputType
 typedef struct OutputPluginOptions
 {
        OutputPluginOutputType output_type;
+       bool            receive_rewrites;
 } OutputPluginOptions;
 
 /*
index 0970abca52ae36dbcaadea2faee2cc0ea7911e53..aa430c843c0b7e60fdd353d32a6d0be03145ac83 100644 (file)
@@ -336,6 +336,11 @@ struct ReorderBuffer
         */
        void       *private_data;
 
+       /*
+        * Saved output plugin option
+        */
+       bool            output_rewrites;
+
        /*
         * Private memory context.
         */