diff options
Diffstat (limited to 'src/test')
| -rw-r--r-- | src/test/regress/expected/subscription.out | 63 | ||||
| -rw-r--r-- | src/test/regress/sql/subscription.sql | 15 | ||||
| -rw-r--r-- | src/test/subscription/t/015_stream.pl | 98 |
3 files changed, 156 insertions, 20 deletions
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index d71db0d5207..2fa9bce66a4 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -162,19 +162,42 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist +(1 row) + +DROP SUBSCRIPTION regress_testsub; +-- fail - streaming must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo); +ERROR: streaming requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (streaming = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index eeb2ec06ebe..14fa0b247e1 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -132,6 +132,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - streaming must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (streaming = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +\dRs+ + +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl new file mode 100644 index 00000000000..fffe0019650 --- /dev/null +++ b/src/test/subscription/t/015_stream.pl @@ -0,0 +1,98 @@ +# Test streaming of simple large transaction +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + +# Test the streaming in binary mode +$node_subscriber->safe_psql('postgres', +"ALTER SUBSCRIPTION tap_sub SET (binary = on)" +); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(6667|6667|6667), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values. This is to ensure that non-streaming transactions behave +# properly after a streaming transaction. +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(6667|6667|6667), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; |
