diff options
| author | Amit Kapila | 2020-09-03 02:24:07 +0000 |
|---|---|---|
| committer | Amit Kapila | 2020-09-03 02:24:07 +0000 |
| commit | 464824323e57dc4b397e8b05854d779908b55304 (patch) | |
| tree | 30a02506ae6b53475302980bc558e2a41ea429f0 /src/test | |
| parent | 66f163068030b5c5fe792a0daee27822dac43791 (diff) | |
Add support for streaming to built-in logical replication.
To add support for streaming of in-progress transactions into the
built-in logical replication, we need to do three things:
* Extend the logical replication protocol, so identify in-progress
transactions, and allow adding additional bits of information (e.g.
XID of subtransactions).
* Modify the output plugin (pgoutput) to implement the new stream
API callbacks, by leveraging the extended replication protocol.
* Modify the replication apply worker, to properly handle streamed
in-progress transaction by spilling the data to disk and then
replaying them on commit.
We however must explicitly disable streaming replication during
replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover we don't have a replication connection open so we
don't have where to send the data anyway.
Author: Tomas Vondra, Dilip Kumar and Amit Kapila
Reviewed-by: Amit Kapila, Kuntal Ghosh and Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
Diffstat (limited to 'src/test')
| -rw-r--r-- | src/test/regress/expected/subscription.out | 63 | ||||
| -rw-r--r-- | src/test/regress/sql/subscription.sql | 15 | ||||
| -rw-r--r-- | src/test/subscription/t/015_stream.pl | 98 |
3 files changed, 156 insertions, 20 deletions
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index d71db0d5207..2fa9bce66a4 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -162,19 +162,42 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist +(1 row) + +DROP SUBSCRIPTION regress_testsub; +-- fail - streaming must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo); +ERROR: streaming requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (streaming = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index eeb2ec06ebe..14fa0b247e1 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -132,6 +132,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - streaming must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (streaming = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +\dRs+ + +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl new file mode 100644 index 00000000000..fffe0019650 --- /dev/null +++ b/src/test/subscription/t/015_stream.pl @@ -0,0 +1,98 @@ +# Test streaming of simple large transaction +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 4; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" +); + +$node_publisher->wait_for_catchup($appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(2|2|2), 'check initial data was copied to subscriber'); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + +# Test the streaming in binary mode +$node_subscriber->safe_psql('postgres', +"ALTER SUBSCRIPTION tap_sub SET (binary = on)" +); + +# Insert, update and delete enough rows to exceed the 64kB limit. +$node_publisher->safe_psql('postgres', q{ +BEGIN; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); +UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; +DELETE FROM test_tab WHERE mod(a,3) = 0; +COMMIT; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(6667|6667|6667), 'check extra columns contain local defaults'); + +# Change the local values of the extra columns on the subscriber, +# update publisher, and check that subscriber retains the expected +# values. This is to ensure that non-streaming transactions behave +# properly after a streaming transaction. +$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"); +$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"); +is($result, qq(6667|6667|6667), 'check extra columns contain locally changed data'); + +$node_subscriber->stop; +$node_publisher->stop; |
