diff options
Diffstat (limited to 'src/test')
-rw-r--r-- | src/test/regress/expected/subscription.out | 126 | ||||
-rw-r--r-- | src/test/regress/sql/subscription.sql | 11 | ||||
-rw-r--r-- | src/test/subscription/t/029_disable_on_error.pl | 94 | ||||
-rw-r--r-- | src/test/subscription/t/029_on_error.pl | 183 |
4 files changed, 264 insertions, 150 deletions
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index ad8003fae12..7fcfad15916 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -93,11 +93,25 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2 ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" +-- ok +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345 +(1 row) + +-- ok - with lsn = NONE +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); +-- fail +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); +ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -129,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -165,19 +179,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -188,19 +202,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -215,10 +229,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more then once @@ -233,10 +247,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -270,10 +284,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -282,10 +296,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -294,10 +308,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -309,18 +323,18 @@ ERROR: disable_on_error requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index a7c15b1dafc..74c38ead5d6 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -72,6 +72,17 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = ''); ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2'; ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); +-- ok +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); + +\dRs+ + +-- ok - with lsn = NONE +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); + +-- fail +ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); + \dRs+ BEGIN; diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl deleted file mode 100644 index 5eca8044460..00000000000 --- a/src/test/subscription/t/029_disable_on_error.pl +++ /dev/null @@ -1,94 +0,0 @@ - -# Copyright (c) 2021-2022, PostgreSQL Global Development Group - -# Test of logical replication subscription self-disabling feature. -use strict; -use warnings; -use PostgreSQL::Test::Cluster; -use PostgreSQL::Test::Utils; -use Test::More; - -# create publisher node -my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); -$node_publisher->init(allows_streaming => 'logical'); -$node_publisher->start; - -# create subscriber node -my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); -$node_subscriber->init; -$node_subscriber->start; - -# Create identical table on both nodes. -$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)"); -$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)"); - -# Insert duplicate values on the publisher. -$node_publisher->safe_psql('postgres', - "INSERT INTO tbl (i) VALUES (1), (1), (1)"); - -# Create an additional unique index on the subscriber. -$node_subscriber->safe_psql('postgres', - "CREATE UNIQUE INDEX tbl_unique ON tbl (i)"); - -# Create a pub/sub to set up logical replication. This tests that the -# uniqueness violation will cause the subscription to fail during initial -# synchronization and make it disabled. -my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; -$node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub FOR TABLE tbl"); -$node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)" -); - -# Initial synchronization failure causes the subscription to be disabled. -$node_subscriber->poll_query_until('postgres', - "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'" -) or die "Timed out while waiting for subscriber to be disabled"; - -# Drop the unique index on the subscriber which caused the subscription to be -# disabled. -$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique"); - -# Re-enable the subscription "sub". -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); - -# Wait for the data to replicate. -$node_publisher->wait_for_catchup('sub'); -$node_subscriber->poll_query_until('postgres', - "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass" -); - -# Confirm that we have finished the table sync. -my $result = - $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl"); -is($result, qq(1|3), "subscription sub replicated data"); - -# Delete the data from the subscriber and recreate the unique index. -$node_subscriber->safe_psql('postgres', "DELETE FROM tbl"); -$node_subscriber->safe_psql('postgres', - "CREATE UNIQUE INDEX tbl_unique ON tbl (i)"); - -# Add more non-unique data to the publisher. -$node_publisher->safe_psql('postgres', - "INSERT INTO tbl (i) VALUES (3), (3), (3)"); - -# Apply failure causes the subscription to be disabled. -$node_subscriber->poll_query_until('postgres', - "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'" -) or die "Timed out while waiting for subscription sub to be disabled"; - -# Drop the unique index on the subscriber and re-enabled the subscription. Then -# confirm that the previously failing insert was applied OK. -$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique"); -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); - -$node_publisher->wait_for_catchup('sub'); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT COUNT(*) FROM tbl WHERE i = 3"); -is($result, qq(3), 'check the result of apply'); - -$node_subscriber->stop; -$node_publisher->stop; - -done_testing(); diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl new file mode 100644 index 00000000000..e8b904b7452 --- /dev/null +++ b/src/test/subscription/t/029_on_error.pl @@ -0,0 +1,183 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Tests for disable_on_error and SKIP transaction features. +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $offset = 0; + +# Test skipping the transaction. This function must be called after the caller +# has inserted data that conflicts with the subscriber. The finish LSN of the +# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is +# fetched from the server logs. After executing ALTER SUBSCRITPION ... SKIP, we +# check if logical replication can continue working by inserting $nonconflict_data +# on the publisher. +sub test_skip_lsn +{ + my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg) + = @_; + + # Wait until a conflict occurs on the subscriber. + $node_subscriber->poll_query_until('postgres', + "SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'" + ); + + # Get the finish LSN of the error transaction. + my $contents = slurp_file($node_subscriber->logfile, $offset); + $contents =~ + qr/processing remote data for replication origin \"pg_\d+\" during "INSERT" for replication target relation "public.tbl" in transaction \d+ finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/ + or die "could not get error-LSN"; + my $lsn = $1; + + # Set skip lsn. + $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn')"); + + # Re-enable the subscription. + $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); + + # Wait for the failed transaction to be skipped + $node_subscriber->poll_query_until('postgres', + "SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'" + ); + + # Check the log to ensure that the transaction is skipped, and advance the + # offset of the log file for the next test. + $offset = $node_subscriber->wait_for_log( + qr/LOG: done skipping logical replication transaction finished at $lsn/, + $offset); + + # Insert non-conflict data + $node_publisher->safe_psql('postgres', + "INSERT INTO tbl VALUES $nonconflict_data"); + + $node_publisher->wait_for_catchup('sub'); + + # Check replicated data + my $res = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl"); + is($res, $expected, $msg); +} + +# Create publisher node. Set a low value of logical_decoding_work_mem to test +# streaming cases. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf( + 'postgresql.conf', + qq[ +logical_decoding_work_mem = 64kB +max_prepared_transactions = 10 +]); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf( + 'postgresql.conf', + qq[ +max_prepared_transactions = 10 +]); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On the subscriber, we +# create the same tables but with a primary key. Also, insert some data that +# will conflict with the data replicated from publisher later. +$node_publisher->safe_psql( + 'postgres', + qq[ +CREATE TABLE tbl (i INT, t TEXT); +INSERT INTO tbl VALUES (1, NULL); +]); +$node_subscriber->safe_psql( + 'postgres', + qq[ +CREATE TABLE tbl (i INT PRIMARY KEY, t TEXT); +INSERT INTO tbl VALUES (1, NULL); +]); + +# Create a pub/sub to set up logical replication. This tests that the +# uniqueness violation will cause the subscription to fail during initial +# synchronization and make it disabled. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR TABLE tbl"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)" +); + +# Initial synchronization failure causes the subscription to be disabled. +$node_subscriber->poll_query_until('postgres', + "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'" +) or die "Timed out while waiting for subscriber to be disabled"; + +# Truncate the table on the subscriber which caused the subscription to be +# disabled. +$node_subscriber->safe_psql('postgres', "TRUNCATE tbl"); + +# Re-enable the subscription "sub". +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); + +# Wait for the data to replicate. +$node_publisher->wait_for_catchup('sub'); +$node_subscriber->poll_query_until('postgres', + "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass" +); + +# Confirm that we have finished the table sync. +my $result = + $node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM tbl"); +is($result, qq(1), "subscription sub replicated data"); + +# Insert data to tbl, raising an error on the subscriber due to violation +# of the unique constraint on tbl. Then skip the transaction. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO tbl VALUES (1, NULL); +COMMIT; +]); +test_skip_lsn($node_publisher, $node_subscriber, + "(2, NULL)", "2", "test skipping transaction"); + +# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and +# PREPARE the transaction, raising an error. Then skip the transaction. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO tbl VALUES (1, NULL); +PREPARE TRANSACTION 'gtx'; +COMMIT PREPARED 'gtx'; +]); +test_skip_lsn($node_publisher, $node_subscriber, + "(3, NULL)", "3", "test skipping prepare and commit prepared "); + +# Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB +# limit, also raising an error on the subscriber during applying spooled +# changes for the same reason. Then skip the transaction. +$node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO tbl SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i); +COMMIT; +]); +test_skip_lsn($node_publisher, $node_subscriber, "(4, md5(4::text))", + "4", "test skipping stream-commit"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT COUNT(*) FROM pg_prepared_xacts"); +is($result, "0", + "check all prepared transactions are resolved on the subscriber"); + +$node_subscriber->stop; +$node_publisher->stop; + +done_testing(); |