diff options
| author | Peter Eisentraut | 2017-03-23 12:36:36 +0000 |
|---|---|---|
| committer | Peter Eisentraut | 2017-03-23 12:55:37 +0000 |
| commit | 7c4f52409a8c7d85ed169bbbc1f6092274d03920 (patch) | |
| tree | fa3dc592bb2855e5cc0a200f4c408b4c8d299be5 /src/test/subscription | |
| parent | 707576b571f05ec5b89adb65964d55f3ccccbd1b (diff) | |
Logical replication support for initial data copy
Add functionality for a new subscription to copy the initial data in the
tables and then sync with the ongoing apply process.
For the copying, add a new internal COPY option to have the COPY source
data provided by a callback function. The initial data copy works on
the subscriber by receiving COPY data from the publisher and then
providing it locally into a COPY that writes to the destination table.
A WAL receiver can now execute full SQL commands. This is used here to
obtain information about tables and publications.
Several new options were added to CREATE and ALTER SUBSCRIPTION to
control whether and when initial table syncing happens.
Change pg_dump option --no-create-subscription-slots to
--no-subscription-connect and use the new CREATE SUBSCRIPTION
... NOCONNECT option for that.
Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Tested-by: Erik Rijkers <er@xs4all.nl>
Diffstat (limited to 'src/test/subscription')
| -rw-r--r-- | src/test/subscription/t/001_rep_changes.pl | 36 | ||||
| -rw-r--r-- | src/test/subscription/t/002_types.pl | 6 | ||||
| -rw-r--r-- | src/test/subscription/t/003_constraints.pl | 2 | ||||
| -rw-r--r-- | src/test/subscription/t/004_sync.pl | 159 |
4 files changed, 194 insertions, 9 deletions
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index b81028aed14..d1817f57da4 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 11; +use Test::More tests => 14; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -19,7 +19,7 @@ $node_subscriber->start; $node_publisher->safe_psql('postgres', "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); $node_publisher->safe_psql('postgres', - "CREATE TABLE tab_ins (a int)"); + "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); $node_publisher->safe_psql('postgres', @@ -56,10 +56,20 @@ my $caughtup_query = $node_publisher->poll_query_until('postgres', $caughtup_query) or die "Timed out while waiting for subscriber to catch up"; +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); is($result, qq(0), 'check non-replicated table is empty on subscriber'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_ins SELECT generate_series(1,50)"); $node_publisher->safe_psql('postgres', @@ -79,7 +89,7 @@ $node_publisher->poll_query_until('postgres', $caughtup_query) $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); -is($result, qq(50|1|50), 'check replicated inserts on subscriber'); +is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); @@ -109,7 +119,7 @@ $node_publisher->poll_query_until('postgres', $caughtup_query) $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); -is($result, qq(10|1|100), 'update works with REPLICA IDENTITY FULL and duplicate tuples'); +is($result, qq(20|1|100), 'update works with REPLICA IDENTITY FULL and duplicate tuples'); # check that change of connection string and/or publication list causes # restart of subscription workers. Not all of these are registered as tests @@ -126,7 +136,7 @@ $node_publisher->poll_query_until('postgres', $oldpid = $node_publisher->safe_psql('postgres', "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"); $node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only"); + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only REFRESH WITH (NOCOPY DATA)"); $node_publisher->poll_query_until('postgres', "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';") or die "Timed out while waiting for apply to restart"; @@ -141,7 +151,7 @@ $node_publisher->poll_query_until('postgres', $caughtup_query) $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); -is($result, qq(150|1|1100), 'check replicated inserts after subscription publication change'); +is($result, qq(1152|1|1100), 'check replicated inserts after subscription publication change'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); @@ -154,6 +164,8 @@ $node_publisher->safe_psql('postgres', "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (NOCOPY DATA)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); @@ -163,11 +175,11 @@ $node_publisher->poll_query_until('postgres', $caughtup_query) # note that data are different on provider and subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); -is($result, qq(50|1|50), 'check replicated deletes after alter publication'); +is($result, qq(1052|1|1002), 'check replicated deletes after alter publication'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); -is($result, qq(11|0|100), 'check replicated insert after alter publication'); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); # check restart on rename $oldpid = $node_publisher->safe_psql('postgres', @@ -190,6 +202,14 @@ $result = is($result, qq(0), 'check replication slot was dropped on publisher'); $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber'); + +$result = + $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); is($result, qq(0), 'check replication origin was dropped on subscriber'); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl index f44e1e671d2..ad15e85c0ca 100644 --- a/src/test/subscription/t/002_types.pl +++ b/src/test/subscription/t/002_types.pl @@ -111,6 +111,12 @@ my $caughtup_query = $node_publisher->poll_query_until('postgres', $caughtup_query) or die "Timed out while waiting for subscriber to catch up"; +# Wait for initial sync to finish as well +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + # Insert initial test data $node_publisher->safe_psql('postgres', qq( -- test_tbl_one_array_col diff --git a/src/test/subscription/t/003_constraints.pl b/src/test/subscription/t/003_constraints.pl index b785132f5b2..11b82541551 100644 --- a/src/test/subscription/t/003_constraints.pl +++ b/src/test/subscription/t/003_constraints.pl @@ -34,7 +34,7 @@ $node_publisher->safe_psql('postgres', my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub;"); + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (NOCOPY DATA)"); # Wait for subscriber to finish initialization my $caughtup_query = diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl new file mode 100644 index 00000000000..87541a0e6e1 --- /dev/null +++ b/src/test/subscription/t/004_sync.pl @@ -0,0 +1,159 @@ +# Tests for logical replication table syncing +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,10)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"); + +# Wait for subscriber to finish initialization +my $caughtup_query = +"SELECT pg_current_wal_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(10), 'initial data synced for first sub'); + +# drop subscription so that there is unreplicated data +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(11,20)"); + +# recreate the subscription, it will try to do initial copy +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"); + +# but it will be stuck on data copy as it will fail on constraint +my $started_query = +"SELECT srsubstate = 'd' FROM pg_subscription_rel;"; +$node_subscriber->poll_query_until('postgres', $started_query) + or die "Timed out while waiting for subscriber to start sync"; + +# remove the conflicting data +$node_subscriber->safe_psql('postgres', + "DELETE FROM tab_rep;"); + +# wait for sync to finish this time +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that all data is synced +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(20), 'initial data synced for second sub'); + +# now check another subscription for the same node pair +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (NOCOPY DATA)"); + +# wait for it to start +$node_subscriber->poll_query_until('postgres', "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL") + or die "Timed out while waiting for subscriber to start"; + +# and drop both subscriptions +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2"); + +# check subscriptions are removed +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'second and third sub are dropped'); + +# remove the conflicting data +$node_subscriber->safe_psql('postgres', + "DELETE FROM tab_rep;"); + +# recreate the subscription again +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"); + +# and wait for data sync to finish again +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that all data is synced +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(20), 'initial data synced for fourth sub'); + +# add new table on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep_next (a int)"); + +# setup structure with existing data on pubisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)"); + +# Wait for subscription to catch up +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); +is($result, qq(0), 'no data for table added after subscription initialized'); + +# ask for data sync +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +# wait for sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); +is($result, qq(10), 'data for table added after subscription initialized are now synced'); + +# Add some data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep_next SELECT generate_series(1,10)"); + +# Wait for subscription to catch up +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); +is($result, qq(20), 'changes for table added after subscription initialized replicated'); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); |
