summaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
authorAmit Kapila2022-03-14 04:02:40 +0000
committerAmit Kapila2022-03-14 04:02:40 +0000
commit705e20f8550c0e8e47c0b6b20b5f5ffd6ffd9e33 (patch)
treed6606b7864bdcc3439caaeccfc0dc3a31a9386e3 /src/test
parent369398ed886dc13956654777467536625e6fc7ee (diff)
Optionally disable subscriptions on error.
Logical replication apply workers for a subscription can easily get stuck in an infinite loop of attempting to apply a change, triggering an error (such as a constraint violation), exiting with the error written to the subscription server log, and restarting. To partially remedy the situation, this patch adds a new subscription option named 'disable_on_error'. To be consistent with old behavior, this option defaults to false. When true, both the tablesync worker and apply worker catch any errors thrown and disable the subscription in order to break the loop. The error is still also written in the logs. Once the subscription is disabled, users can either manually resolve the conflict/error or skip the conflicting transaction by using pg_replication_origin_advance() function. After resolving the conflict, users need to enable the subscription to allow apply process to proceed. Author: Osumi Takamichi and Mark Dilger Reviewed-by: Greg Nancarrow, Vignesh C, Amit Kapila, Wang wei, Tang Haiying, Peter Smith, Masahiko Sawada, Shi Yu Discussion : https://postgr.es/m/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com
Diffstat (limited to 'src/test')
-rw-r--r--src/test/regress/expected/subscription.out119
-rw-r--r--src/test/regress/sql/subscription.sql15
-rw-r--r--src/test/subscription/t/029_disable_on_error.pl94
3 files changed, 180 insertions, 48 deletions
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 80aae83562c..ad8003fae12 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -94,10 +94,10 @@ ERROR: subscription "regress_doesnotexist" does not exist
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | off | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2
(1 row)
BEGIN;
@@ -129,10 +129,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | local | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2
(1 row)
-- rename back to keep the rest simple
@@ -165,19 +165,19 @@ ERROR: binary requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -188,19 +188,19 @@ ERROR: streaming requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
-- fail - publication already exists
@@ -215,10 +215,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
-- fail - publication used more then once
@@ -233,10 +233,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
@@ -270,10 +270,10 @@ ERROR: two_phase requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist
(1 row)
--fail - alter of two_phase option not supported.
@@ -282,10 +282,10 @@ ERROR: unrecognized subscription parameter: "two_phase"
-- but can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -294,10 +294,33 @@ DROP SUBSCRIPTION regress_testsub;
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+ERROR: disable_on_error requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index bd0f4af1e49..a7c15b1dafc 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -228,6 +228,21 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+-- fail - disable_on_error must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/029_disable_on_error.pl b/src/test/subscription/t/029_disable_on_error.pl
new file mode 100644
index 00000000000..5eca8044460
--- /dev/null
+++ b/src/test/subscription/t/029_disable_on_error.pl
@@ -0,0 +1,94 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test of logical replication subscription self-disabling feature.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create identical table on both nodes.
+$node_publisher->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tbl (i INT)");
+
+# Insert duplicate values on the publisher.
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tbl (i) VALUES (1), (1), (1)");
+
+# Create an additional unique index on the subscriber.
+$node_subscriber->safe_psql('postgres',
+ "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
+
+# Create a pub/sub to set up logical replication. This tests that the
+# uniqueness violation will cause the subscription to fail during initial
+# synchronization and make it disabled.
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub FOR TABLE tbl");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (disable_on_error = true)"
+);
+
+# Initial synchronization failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscriber to be disabled";
+
+# Drop the unique index on the subscriber which caused the subscription to be
+# disabled.
+$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
+
+# Re-enable the subscription "sub".
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the data to replicate.
+$node_publisher->wait_for_catchup('sub');
+$node_subscriber->poll_query_until('postgres',
+ "SELECT COUNT(1) = 0 FROM pg_subscription_rel sr WHERE sr.srsubstate NOT IN ('s', 'r') AND sr.srrelid = 'tbl'::regclass"
+);
+
+# Confirm that we have finished the table sync.
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT MAX(i), COUNT(*) FROM tbl");
+is($result, qq(1|3), "subscription sub replicated data");
+
+# Delete the data from the subscriber and recreate the unique index.
+$node_subscriber->safe_psql('postgres', "DELETE FROM tbl");
+$node_subscriber->safe_psql('postgres',
+ "CREATE UNIQUE INDEX tbl_unique ON tbl (i)");
+
+# Add more non-unique data to the publisher.
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO tbl (i) VALUES (3), (3), (3)");
+
+# Apply failure causes the subscription to be disabled.
+$node_subscriber->poll_query_until('postgres',
+ "SELECT subenabled = false FROM pg_catalog.pg_subscription WHERE subname = 'sub'"
+) or die "Timed out while waiting for subscription sub to be disabled";
+
+# Drop the unique index on the subscriber and re-enabled the subscription. Then
+# confirm that the previously failing insert was applied OK.
+$node_subscriber->safe_psql('postgres', "DROP INDEX tbl_unique");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+ "SELECT COUNT(*) FROM tbl WHERE i = 3");
+is($result, qq(3), 'check the result of apply');
+
+$node_subscriber->stop;
+$node_publisher->stop;
+
+done_testing();