summaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
authorAmit Kapila2021-02-12 02:11:51 +0000
committerAmit Kapila2021-02-12 02:11:51 +0000
commitce0fdbfe9722867b7fad4d3ede9b6a6bfc51fb4e (patch)
treebe540b24d4cc30cbbd52e92ac164239b6773a699 /src/test
parent3063eb17593c3ad498ce4e89db3358862ea2dbb6 (diff)
Allow multiple xacts during table sync in logical replication.
For the initial table data synchronization in logical replication, we use a single transaction to copy the entire table and then synchronize the position in the stream with the main apply worker. There are multiple downsides of this approach: (a) We have to perform the entire copy operation again if there is any error (network breakdown, error in the database operation, etc.) while we synchronize the WAL position between tablesync worker and apply worker; this will be onerous especially for large copies, (b) Using a single transaction in the synchronization-phase (where we can receive WAL from multiple transactions) will have the risk of exceeding the CID limit, (c) The slot will hold the WAL till the entire sync is complete because we never commit till the end. This patch solves all the above downsides by allowing multiple transactions during the tablesync phase. The initial copy is done in a single transaction and after that, we commit each transaction as we receive. To allow recovery after any error or crash, we use a permanent slot and origin to track the progress. The slot and origin will be removed once we finish the synchronization of the table. We also remove slot and origin of tablesync workers if the user performs DROP SUBSCRIPTION .. or ALTER SUBSCRIPTION .. REFERESH and some of the table syncs are still not finished. The commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION and ALTER SUBSCRIPTION ... SET PUBLICATION ... with refresh option as true cannot be executed inside a transaction block because they can now drop the slots for which we have no provision to rollback. This will also open up the path for logical replication of 2PC transactions on the subscriber side. Previously, we can't do that because of the requirement of maintaining a single transaction in tablesync workers. Bump catalog version due to change of state in the catalog (pg_subscription_rel). Author: Peter Smith, Amit Kapila, and Takamichi Osumi Reviewed-by: Ajin Cherian, Petr Jelinek, Hou Zhijie and Amit Kapila Discussion: https://postgr.es/m/CAA4eK1KHJxaZS-fod-0fey=0tq3=Gkn4ho=8N4-5HWiCfu0H1A@mail.gmail.com
Diffstat (limited to 'src/test')
-rw-r--r--src/test/regress/expected/subscription.out21
-rw-r--r--src/test/regress/sql/subscription.sql22
-rw-r--r--src/test/subscription/t/004_sync.pl21
3 files changed, 63 insertions, 1 deletions
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 2fa9bce66a4..7802279cb2e 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -201,6 +201,27 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
(1 row)
DROP SUBSCRIPTION regress_testsub;
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=postgres' PUBLICATION mypub
+ WITH (enabled = true, create_slot = false, copy_data = false);
+-- fail - ALTER SUBSCRIPTION with refresh is not allowed in a transaction
+-- block or function
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true);
+ERROR: ALTER SUBSCRIPTION with refresh cannot run inside a transaction block
+END;
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION;
+ERROR: ALTER SUBSCRIPTION ... REFRESH cannot run inside a transaction block
+END;
+CREATE FUNCTION func() RETURNS VOID AS
+$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL;
+SELECT func();
+ERROR: ALTER SUBSCRIPTION with refresh cannot be executed from a function
+CONTEXT: SQL function "func" statement 1
+ALTER SUBSCRIPTION regress_testsub DISABLE;
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+DROP FUNCTION func;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 14fa0b247e1..ca0d7827429 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -147,6 +147,28 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=postgres' PUBLICATION mypub
+ WITH (enabled = true, create_slot = false, copy_data = false);
+
+-- fail - ALTER SUBSCRIPTION with refresh is not allowed in a transaction
+-- block or function
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true);
+END;
+
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub REFRESH PUBLICATION;
+END;
+
+CREATE FUNCTION func() RETURNS VOID AS
+$$ ALTER SUBSCRIPTION regress_testsub SET PUBLICATION mypub WITH (refresh = true) $$ LANGUAGE SQL;
+SELECT func();
+
+ALTER SUBSCRIPTION regress_testsub DISABLE;
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+DROP FUNCTION func;
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl
index e111ab91810..c7926681b66 100644
--- a/src/test/subscription/t/004_sync.pl
+++ b/src/test/subscription/t/004_sync.pl
@@ -3,7 +3,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 8;
# Initialize publisher node
my $node_publisher = get_new_node('publisher');
@@ -149,7 +149,26 @@ $result = $node_subscriber->safe_psql('postgres',
is($result, qq(20),
'changes for table added after subscription initialized replicated');
+# clean up
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_rep_next");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_rep_next");
$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+# Table tap_rep already has the same records on both publisher and subscriber
+# at this time. Recreate the subscription which will do the initial copy of
+# the table again and fails due to unique constraint violation.
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub");
+
+$result = $node_subscriber->poll_query_until('postgres', $started_query)
+ or die "Timed out while waiting for subscriber to start sync";
+
+# DROP SUBSCRIPTION must clean up slots on the publisher side when the
+# subscriber is stuck on data copy for constraint violation.
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+$result = $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(0), 'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
+
$node_subscriber->stop('fast');
$node_publisher->stop('fast');