summaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
authorAmit Kapila2022-03-22 01:41:19 +0000
committerAmit Kapila2022-03-22 01:41:19 +0000
commit208c5d65bbd60e33e272964578cb74182ac726a8 (patch)
tree5f3a99783f4c7be35c16237c5b10ebc711b37293 /src/test
parent315ae75e9b6da72456eaa44e55ace9ab1b95ef74 (diff)
Add ALTER SUBSCRIPTION ... SKIP.
This feature allows skipping the transaction on subscriber nodes. If incoming change violates any constraint, logical replication stops until it's resolved. Currently, users need to either manually resolve the conflict by updating a subscriber-side database or by using function pg_replication_origin_advance() to skip the conflicting transaction. This commit introduces a simpler way to skip the conflicting transactions. The user can specify LSN by ALTER SUBSCRIPTION ... SKIP (lsn = XXX), which allows the apply worker to skip the transaction finished at specified LSN. The apply worker skips all data modification changes within the transaction. Author: Masahiko Sawada Reviewed-by: Takamichi Osumi, Hou Zhijie, Peter Eisentraut, Amit Kapila, Shi Yu, Vignesh C, Greg Nancarrow, Haiying Tang, Euler Taveira Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
Diffstat (limited to 'src/test')
-rw-r--r--src/test/regress/expected/subscription.out126
-rw-r--r--src/test/regress/sql/subscription.sql11
-rw-r--r--src/test/subscription/t/029_disable_on_error.pl94
-rw-r--r--src/test/subscription/t/029_on_error.pl183
4 files changed, 264 insertions, 150 deletions
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index ad8003fae12..7fcfad15916 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -93,11 +93,25 @@ ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2
ERROR: subscription "regress_doesnotexist" does not exist
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
+-- ok
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345
+(1 row)
+
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+ERROR: invalid WAL location (LSN): 0/0
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@@ -129,10 +143,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@@ -165,19 +179,19 @@ ERROR: binary requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +202,19 @@ ERROR: streaming requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication already exists
@@ -215,10 +229,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication used more then once
@@ -233,10 +247,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +284,10 @@ ERROR: two_phase requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
--fail - alter of two_phase option not supported.
@@ -282,10 +296,10 @@ ERROR: unrecognized subscription parameter: "two_phase"
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +308,10 @@ DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -309,18 +323,18 @@ ERROR: disable_on_error requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index a7c15b1dafc..74c38ead5d6 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -72,6 +72,17 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = '');
ALTER SUBSCRIPTION regress_doesnotexist CONNECTION 'dbname=regress_doesnotexist2';
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
+-- ok
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
+
+\dRs+
+
+-- ok - with lsn = NONE
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
+
+-- fail
+ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
+
\dRs+
BEGIN;
diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl
deleted file mode 100644
index 5eca8044460..00000000000
--- a/src/test/subscription/t/029_disable_on_error.pl
+++ /dev/null
@@ -1,94 +0,0 @@
-
-# Copyright (c) 2021-2022, PostgreSQL Global Development Group
-
-# Test of logical replication subscription self-disabling feature.
-use strict;
-use warnings;
-use PostgreSQL::Test::Cluster;
-use PostgreSQL::Test::Utils;
-use Test::More;
-
-# create publisher node
-my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
-$node_publisher->init(allows_streaming => 'logical');
-$node_publisher->start;
-
-# create subscriber node
-my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
-$node_subscriber->init;
-$node_subscriber->start;
-
-# Create identical table on both nodes.
-$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
-$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
-
-# Insert duplicate values on the publisher.
-$node_publisher->safe_psql('postgres',
- "INSERT INTO tbl (i) VALUES (1), (1), (1)");
-
-# Create an additional unique index on the subscriber.
-$node_subscriber->safe_psql('postgres',
- "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
-
-# Create a pub/sub to set up logical replication. This tests that the
-# uniqueness violation will cause the subscription to fail during initial
-# synchronization and make it disabled.
-my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
-$node_publisher->safe_psql('postgres',
- "CREATE PUBLICATION pub FOR TABLE tbl");
-$node_subscriber->safe_psql('postgres',
- "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)"
-);
-
-# Initial synchronization failure causes the subscription to be disabled.
-$node_subscriber->poll_query_until('postgres',
- "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
-) or die "Timed out while waiting for subscriber to be disabled";
-
-# Drop the unique index on the subscriber which caused the subscription to be
-# disabled.
-$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
-
-# Re-enable the subscription "sub".
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-
-# Wait for the data to replicate.
-$node_publisher->wait_for_catchup('sub');
-$node_subscriber->poll_query_until('postgres',
- "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
-);
-
-# Confirm that we have finished the table sync.
-my $result =
- $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl");
-is($result, qq(1|3), "subscription sub replicated data");
-
-# Delete the data from the subscriber and recreate the unique index.
-$node_subscriber->safe_psql('postgres', "DELETE FROM tbl");
-$node_subscriber->safe_psql('postgres',
- "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
-
-# Add more non-unique data to the publisher.
-$node_publisher->safe_psql('postgres',
- "INSERT INTO tbl (i) VALUES (3), (3), (3)");
-
-# Apply failure causes the subscription to be disabled.
-$node_subscriber->poll_query_until('postgres',
- "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
-) or die "Timed out while waiting for subscription sub to be disabled";
-
-# Drop the unique index on the subscriber and re-enabled the subscription. Then
-# confirm that the previously failing insert was applied OK.
-$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
-
-$node_publisher->wait_for_catchup('sub');
-
-$result = $node_subscriber->safe_psql('postgres',
- "SELECT COUNT(*) FROM tbl WHERE i = 3");
-is($result, qq(3), 'check the result of apply');
-
-$node_subscriber->stop;
-$node_publisher->stop;
-
-done_testing();
diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl
new file mode 100644
index 00000000000..e8b904b7452
--- /dev/null
+++ b/src/test/subscription/t/029_on_error.pl
@@ -0,0 +1,183 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Tests for disable_on_error and SKIP transaction features.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $offset = 0;
+
+# Test skipping the transaction. This function must be called after the caller
+# has inserted data that conflicts with the subscriber. The finish LSN of the
+# error transaction that is used to specify to ALTER SUBSCRIPTION ... SKIP is
+# fetched from the server logs. After executing ALTER SUBSCRITPION ... SKIP, we
+# check if logical replication can continue working by inserting $nonconflict_data
+# on the publisher.
+sub test_skip_lsn
+{
+ my ($node_publisher, $node_subscriber, $nonconflict_data, $expected, $msg)
+ = @_;
+
+ # Wait until a conflict occurs on the subscriber.
+ $node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'"
+ );
+
+ # Get the finish LSN of the error transaction.
+ my $contents = slurp_file($node_subscriber->logfile, $offset);
+ $contents =~
+ qr/processing remote data for replication origin \"pg_\d+\" during "INSERT" for replication target relation "public.tbl" in transaction \d+ finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/
+ or die "could not get error-LSN";
+ my $lsn = $1;
+
+ # Set skip lsn.
+ $node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn')");
+
+ # Re-enable the subscription.
+ $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+ # Wait for the failed transaction to be skipped
+ $node_subscriber->poll_query_until('postgres',
+ "SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'"
+ );
+
+ # Check the log to ensure that the transaction is skipped, and advance the
+ # offset of the log file for the next test.
+ $offset = $node_subscriber->wait_for_log(
+ qr/LOG: done skipping logical replication transaction finished at $lsn/,
+ $offset);
+
+ # Insert non-conflict data
+ $node_publisher->safe_psql('postgres',
+ "INSERT INTO tbl VALUES $nonconflict_data");
+
+ $node_publisher->wait_for_catchup('sub');
+
+ # Check replicated data
+ my $res =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+ is($res, $expected, $msg);
+}
+
+# Create publisher node. Set a low value of logical_decoding_work_mem to test
+# streaming cases.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf(
+ 'postgresql.conf',
+ qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf(
+ 'postgresql.conf',
+ qq[
+max_prepared_transactions = 10
+]);
+$node_subscriber->start;
+
+# Initial table setup on both publisher and subscriber. On the subscriber, we
+# create the same tables but with a primary key. Also, insert some data that
+# will conflict with the data replicated from publisher later.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+CREATE TABLE tbl (i INT, t TEXT);
+INSERT INTO tbl VALUES (1, NULL);
+]);
+$node_subscriber->safe_psql(
+ 'postgres',
+ qq[
+CREATE TABLE tbl (i INT PRIMARY KEY, t TEXT);
+INSERT INTO tbl VALUES (1, NULL);
+]);
+
+# Create a pub/sub to set up logical replication. This tests that the
+# uniqueness violation will cause the subscription to fail during initial
+# synchronization and make it disabled.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub FOR TABLE tbl");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true, streaming = on, two_phase = on)"
+);
+
+# Initial synchronization failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscriber to be disabled";
+
+# Truncate the table on the subscriber which caused the subscription to be
+# disabled.
+$node_subscriber->safe_psql('postgres', "TRUNCATE tbl");
+
+# Re-enable the subscription "sub".
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the data to replicate.
+$node_publisher->wait_for_catchup('sub');
+$node_subscriber->poll_query_until('postgres',
+ "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
+);
+
+# Confirm that we have finished the table sync.
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT COUNT(*) FROM tbl");
+is($result, qq(1), "subscription sub replicated data");
+
+# Insert data to tbl, raising an error on the subscriber due to violation
+# of the unique constraint on tbl. Then skip the transaction.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+INSERT INTO tbl VALUES (1, NULL);
+COMMIT;
+]);
+test_skip_lsn($node_publisher, $node_subscriber,
+ "(2, NULL)", "2", "test skipping transaction");
+
+# Test for PREPARE and COMMIT PREPARED. Insert the same data to tbl and
+# PREPARE the transaction, raising an error. Then skip the transaction.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+INSERT INTO tbl VALUES (1, NULL);
+PREPARE TRANSACTION 'gtx';
+COMMIT PREPARED 'gtx';
+]);
+test_skip_lsn($node_publisher, $node_subscriber,
+ "(3, NULL)", "3", "test skipping prepare and commit prepared ");
+
+# Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB
+# limit, also raising an error on the subscriber during applying spooled
+# changes for the same reason. Then skip the transaction.
+$node_publisher->safe_psql(
+ 'postgres',
+ qq[
+BEGIN;
+INSERT INTO tbl SELECT i, md5(i::text) FROM generate_series(1, 10000) s(i);
+COMMIT;
+]);
+test_skip_lsn($node_publisher, $node_subscriber, "(4, md5(4::text))",
+ "4", "test skipping stream-commit");
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT COUNT(*) FROM pg_prepared_xacts");
+is($result, "0",
+ "check all prepared transactions are resolved on the subscriber");
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();