summaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/test')
-rw-r--r--src/test/regress/expected/subscription.out63
-rw-r--r--src/test/regress/sql/subscription.sql15
-rw-r--r--src/test/subscription/t/015_stream.pl98
3 files changed, 156 insertions, 20 deletions
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index d71db0d5207..2fa9bce66a4 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2
(1 row)
BEGIN;
@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
----------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+---------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2
(1 row)
-- rename back to keep the rest simple
@@ -162,19 +162,42 @@ ERROR: binary requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
+(1 row)
+
+DROP SUBSCRIPTION regress_testsub;
+-- fail - streaming must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
+ERROR: streaming requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
+WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index eeb2ec06ebe..14fa0b247e1 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -132,6 +132,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
+-- fail - streaming must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+\dRs+
+
+DROP SUBSCRIPTION regress_testsub;
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
new file mode 100644
index 00000000000..fffe0019650
--- /dev/null
+++ b/src/test/subscription/t/015_stream.pl
@@ -0,0 +1,98 @@
+# Test streaming of simple large transaction
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'check extra columns contain local defaults');
+
+# Test the streaming in binary mode
+$node_subscriber->safe_psql('postgres',
+"ALTER SUBSCRIPTION tap_sub SET (binary = on)"
+);
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
+
+# Change the local values of the extra columns on the subscriber,
+# update publisher, and check that subscriber retains the expected
+# values. This is to ensure that non-streaming transactions behave
+# properly after a streaming transaction.
+$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
+$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");
+is($result, qq(6667|6667|6667), 'check extra columns contain locally changed data');
+
+$node_subscriber->stop;
+$node_publisher->stop;