diff options
| author | Amit Kapila | 2023-01-09 01:30:39 +0000 |
|---|---|---|
| committer | Amit Kapila | 2023-01-09 02:22:45 +0000 |
| commit | 216a784829c2c5f03ab0c43e009126cbb819e9b2 (patch) | |
| tree | 9051220c20b086f981c941397b775b9c83023d43 /src/test/subscription | |
| parent | 5687e7810f1dd32ac1960e67b608c441d87bc229 (diff) | |
Perform apply of large transactions by parallel workers.
Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives the
commit, it reads from those files and applies the entire transaction. To
improve the performance of such transactions, we can instead allow them to
be applied via parallel workers.
In this approach, we assign a new parallel apply worker (if available) as
soon as the xact's first stream is received and the leader apply worker
will send changes to this new worker via shared memory. The parallel apply
worker will directly apply the change instead of writing it to temporary
files. However, if the leader apply worker times out while attempting to
send a message to the parallel apply worker, it will switch to
"partial serialize" mode - in this mode, the leader serializes all
remaining changes to a file and notifies the parallel apply workers to
read and apply them at the end of the transaction. We use a non-blocking
way to send the messages from the leader apply worker to the parallel
apply to avoid deadlocks. We keep this parallel apply assigned till the
transaction commit is received and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and reading
from files in most cases. We still need to spill if there is no worker
available.
This patch also extends the SUBSCRIPTION 'streaming' parameter so that the
user can control whether to apply the streaming transaction in a parallel
apply worker or spill the change to disk. The user can set the streaming
parameter to 'on/off', or 'parallel'. The parameter value 'parallel' means
the streaming will be applied via a parallel apply worker, if available.
The parameter value 'on' means the streaming transaction will be spilled
to disk. The default value is 'off' (same as current behaviour).
In addition, the patch extends the logical replication STREAM_ABORT
message so that abort_lsn and abort_time can also be sent which can be
used to update the replication origin in parallel apply worker when the
streaming transaction is aborted. Because this message extension is needed
to support parallel streaming, parallel streaming is not supported for
publications on servers < PG16.
Author: Hou Zhijie, Wang wei, Amit Kapila with design inputs from Sawada Masahiko
Reviewed-by: Sawada Masahiko, Peter Smith, Dilip Kumar, Shi yu, Kuroda Hayato, Shveta Mallik
Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
Diffstat (limited to 'src/test/subscription')
| -rw-r--r-- | src/test/subscription/t/015_stream.pl | 274 | ||||
| -rw-r--r-- | src/test/subscription/t/016_stream_subxact.pl | 130 | ||||
| -rw-r--r-- | src/test/subscription/t/017_stream_ddl.pl | 3 | ||||
| -rw-r--r-- | src/test/subscription/t/018_stream_subxact_abort.pl | 220 | ||||
| -rw-r--r-- | src/test/subscription/t/019_stream_subxact_ddl_abort.pl | 3 | ||||
| -rw-r--r-- | src/test/subscription/t/022_twophase_cascade.pl | 2 | ||||
| -rw-r--r-- | src/test/subscription/t/023_twophase_stream.pl | 525 |
7 files changed, 789 insertions, 368 deletions
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index e259c100d58..91e8aa8c0a5 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -8,6 +8,128 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + # Interleave a pair of transactions, each exceeding the 64kB limit. + my $in = ''; + my $out = ''; + + my $offset = 0; + + my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); + + my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, + on_error_stop => 0); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $in .= q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + }; + $h->pump_nb; + + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i); + DELETE FROM test_tab WHERE a > 5000; + COMMIT; + }); + + $in .= q{ + COMMIT; + \q + }; + $h->finish; # errors make the next test fail, so ignore them here + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'check extra columns contain local defaults'); + + # Test the streaming in binary mode + $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Insert, update and delete enough rows to exceed the 64kB limit. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(6667|6667|6667), + 'check extra columns contain local defaults'); + + # Change the local values of the extra columns on the subscriber, + # update publisher, and check that subscriber retains the expected + # values. This is to ensure that non-streaming transactions behave + # properly after a streaming transaction. + $node_subscriber->safe_psql('postgres', + "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" + ); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $node_publisher->safe_psql('postgres', + "UPDATE test_tab SET b = md5(a::text)"); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" + ); + is($result, qq(6667|6667|6667), + 'check extra columns contain locally changed data'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -26,17 +148,27 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); + # Setup structure on subscriber $node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" ); +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)"); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab on test_tab_2(a)"); + # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + "CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -49,6 +181,43 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" +); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel, binary = off)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +test_streaming($node_publisher, $node_subscriber, $appname, 1); + +# Test that the deadlock is detected among the leader and parallel apply +# workers. + +$node_subscriber->append_conf('postgresql.conf', "deadlock_timeout = 10ms"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + # Interleave a pair of transactions, each exceeding the 64kB limit. my $in = ''; my $out = ''; @@ -58,73 +227,90 @@ my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, on_error_stop => 0); +# Confirm if a deadlock between the leader apply worker and the parallel apply +# worker can be detected. + +my $offset = -s $node_subscriber->logfile; + $in .= q{ BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; +INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); }; $h->pump_nb; -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i); -DELETE FROM test_tab WHERE a > 5000; -COMMIT; -}); +# Ensure that the parallel apply worker executes the insert command before the +# leader worker. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/, + $offset); + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)"); $in .= q{ COMMIT; \q }; -$h->finish; # errors make the next test fail, so ignore them here +$h->finish; +$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, + $offset); + +# In order for the two transactions to be completed normally without causing +# conflicts due to the unique index, we temporarily drop it. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); + +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(5001), 'data replicated to subscriber after dropping index'); -# Test the streaming in binary mode +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); +$node_publisher->wait_for_catchup($appname); $node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + "CREATE UNIQUE INDEX idx_tab on test_tab_2(a)"); -# Insert, update and delete enough rows to exceed the 64kB limit. -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -COMMIT; -}); +# Confirm if a deadlock between two parallel apply workers can be detected. -$node_publisher->wait_for_catchup($appname); +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(6667|6667|6667), 'check extra columns contain local defaults'); +$in .= q{ +BEGIN; +INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +# Ensure that the first parallel apply worker executes the insert command +# before the second one. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/, + $offset); -# Change the local values of the extra columns on the subscriber, -# update publisher, and check that subscriber retains the expected -# values. This is to ensure that non-streaming transactions behave -# properly after a streaming transaction. -$node_subscriber->safe_psql('postgres', - "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" -); $node_publisher->safe_psql('postgres', - "UPDATE test_tab SET b = md5(a::text)"); + "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)"); +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, + $offset); + +# In order for the two transactions to be completed normally without causing +# conflicts due to the unique index, we temporarily drop it. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); + +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" -); -is($result, qq(6667|6667|6667), - 'check extra columns contain locally changed data'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(10000), 'data replicated to subscriber after dropping index'); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl index 649dd95f7a5..2f0148c3a86 100644 --- a/src/test/subscription/t/016_stream_subxact.pl +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -8,6 +8,73 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s1; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s2; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s3; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s4; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(12|12|12), + 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' + ); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -37,6 +104,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -49,41 +120,34 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -# Insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s4; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(12|12|12), - 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +test_streaming($node_publisher, $node_subscriber, $appname, 1); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl index 2fbd643e988..626676a383b 100644 --- a/src/test/subscription/t/017_stream_ddl.pl +++ b/src/test/subscription/t/017_stream_ddl.pl @@ -2,6 +2,9 @@ # Copyright (c) 2021-2023, PostgreSQL Global Development Group # Test streaming of large transaction with DDL and subtransactions +# +# This file is mainly to test the DDL/DML interaction of the publisher side, +# so we didn't add a parallel apply version for the tests in this file. use strict; use warnings; use PostgreSQL::Test::Cluster; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 170ee10c1db..dce14b150af 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -8,6 +8,124 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with DDL, DML and ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (3, md5(3::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (4, md5(4::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (5, md5(5::text)); + SAVEPOINT s3; + INSERT INTO test_tab VALUES (6, md5(6::text)); + ROLLBACK TO s2; + INSERT INTO test_tab VALUES (7, md5(7::text)); + ROLLBACK TO s1; + INSERT INTO test_tab VALUES (8, md5(8::text)); + SAVEPOINT s4; + INSERT INTO test_tab VALUES (9, md5(9::text)); + SAVEPOINT s5; + INSERT INTO test_tab VALUES (10, md5(10::text)); + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(6|0), + 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' + ); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with subscriber receiving out of order + # subtransaction ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (11, md5(11::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (12, md5(12::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (13, md5(13::text)); + SAVEPOINT s3; + INSERT INTO test_tab VALUES (14, md5(14::text)); + RELEASE s2; + INSERT INTO test_tab VALUES (15, md5(15::text)); + ROLLBACK TO s1; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(7|0), + 'check rollback to savepoint was reflected on subscriber'); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # streamed transaction with subscriber receiving rollback + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (16, md5(16::text)); + SAVEPOINT s1; + INSERT INTO test_tab VALUES (17, md5(17::text)); + SAVEPOINT s2; + INSERT INTO test_tab VALUES (18, md5(18::text)); + ROLLBACK; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'ABORT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(7|0), 'check rollback was reflected on subscriber'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -36,6 +154,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -48,81 +170,33 @@ my $result = "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2|0), 'check initial data was copied to subscriber'); -# streamed transaction with DDL, DML and ROLLBACKs -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab VALUES (3, md5(3::text)); -SAVEPOINT s1; -INSERT INTO test_tab VALUES (4, md5(4::text)); -SAVEPOINT s2; -INSERT INTO test_tab VALUES (5, md5(5::text)); -SAVEPOINT s3; -INSERT INTO test_tab VALUES (6, md5(6::text)); -ROLLBACK TO s2; -INSERT INTO test_tab VALUES (7, md5(7::text)); -ROLLBACK TO s1; -INSERT INTO test_tab VALUES (8, md5(8::text)); -SAVEPOINT s4; -INSERT INTO test_tab VALUES (9, md5(9::text)); -SAVEPOINT s5; -INSERT INTO test_tab VALUES (10, md5(10::text)); -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(6|0), - 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -# streamed transaction with subscriber receiving out of order subtransaction -# ROLLBACKs -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab VALUES (11, md5(11::text)); -SAVEPOINT s1; -INSERT INTO test_tab VALUES (12, md5(12::text)); -SAVEPOINT s2; -INSERT INTO test_tab VALUES (13, md5(13::text)); -SAVEPOINT s3; -INSERT INTO test_tab VALUES (14, md5(14::text)); -RELEASE s2; -INSERT INTO test_tab VALUES (15, md5(15::text)); -ROLLBACK TO s1; -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(7|0), - 'check rollback to savepoint was reflected on subscriber'); - -# streamed transaction with subscriber receiving rollback -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab VALUES (16, md5(16::text)); -SAVEPOINT s1; -INSERT INTO test_tab VALUES (17, md5(17::text)); -SAVEPOINT s2; -INSERT INTO test_tab VALUES (18, md5(18::text)); -ROLLBACK; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(7|0), 'check rollback was reflected on subscriber'); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +test_streaming($node_publisher, $node_subscriber, $appname, 1); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl index 962e347288f..b30223de51b 100644 --- a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl @@ -3,6 +3,9 @@ # Test streaming of transaction with subtransactions, DDLs, DMLs, and # rollbacks +# +# This file is mainly to test the DDL/DML interaction of the publisher side, +# so we didn't add a parallel apply version for the tests in this file. use strict; use warnings; use PostgreSQL::Test::Cluster; diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl index 8e3e74b134c..e624ebe55c6 100644 --- a/src/test/subscription/t/022_twophase_cascade.pl +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -5,6 +5,8 @@ # # Includes tests for options 2PC (not-streaming) and also for 2PC (streaming). # +# Two-phase and parallel apply will be tested in 023_twophase_stream, so we +# didn't add a parallel apply version for the tests in this file. use strict; use warnings; use PostgreSQL::Test::Cluster; diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index 7cd80174a10..75ca83765e6 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -8,6 +8,289 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check that the parallel apply worker has finished applying the streaming +# transaction. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Common test steps for both the streaming=on and streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + ############################### + # Test 2PC PREPARE / COMMIT PREPARED. + # 1. Data is streamed as a 2PC transaction. + # 2. Then do commit prepared. + # + # Expect all data is replicated on subscriber side after the commit. + ############################### + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # check that 2PC gets replicated to subscriber + # Insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # 2PC transaction gets committed + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is committed on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(4|4|4), + 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' + ); + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber'); + + ############################### + # Test 2PC PREPARE / ROLLBACK PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. Do rollback prepared. + # + # Expect data rolls back leaving only the original 2 rows. + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # 2PC transaction gets aborted + $node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is aborted on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(2|2|2), + 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows' + ); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is aborted on subscriber'); + + ############################### + # Check that 2PC COMMIT PREPARED is decoded properly on crash restart. + # 1. insert, update and delete some rows. + # 2. Then server crashes before the 2PC transaction is committed. + # 3. After servers are restarted the pending transaction is committed. + # + # Expect all data is replicated on subscriber side after the commit. + # Note: both publisher and subscriber do crash/restart. + ############################### + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_subscriber->stop('immediate'); + $node_publisher->stop('immediate'); + + $node_publisher->start; + $node_subscriber->start; + + # We don't try to check the log for parallel option here as the subscriber + # may have stopped after finishing the prepare and before logging the + # appropriate message. + + # commit post the restart + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + $node_publisher->wait_for_catchup($appname); + + # check inserts are visible + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(4|4|4), + 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' + ); + + ############################### + # Do INSERT after the PREPARE but before ROLLBACK PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. A single row INSERT is done which is after the PREPARE. + # 4. Then do a ROLLBACK PREPARED. + # + # Expect the 2PC data rolls back leaving only 3 rows on the subscriber + # (the original 2 + inserted 1). + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # Insert a different record (now we are outside of the 2PC transaction) + # Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key + $node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + + # 2PC transaction gets aborted + $node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is aborted on subscriber, + # but the extra INSERT outside of the 2PC still was replicated + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3|3|3), + 'check the outside insert was copied to subscriber'); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is aborted on subscriber'); + + ############################### + # Do INSERT after the PREPARE but before COMMIT PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. A single row INSERT is done which is after the PREPARE. + # 4. Then do a COMMIT PREPARED. + # + # Expect 2PC data + the extra row are on the subscriber + # (the 3334 + inserted 1 = 3335). + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete some rows. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # Insert a different record (now we are outside of the 2PC transaction) + # Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key + $node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + + # 2PC transaction gets committed + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is committed on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(5|5|5), + 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults' + ); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + $node_publisher->wait_for_catchup($appname); +} + ############################### # Setup ############################### @@ -48,6 +331,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql( 'postgres', " CREATE SUBSCRIPTION tap_sub @@ -64,236 +351,38 @@ my $twophase_query = $node_subscriber->poll_query_until('postgres', $twophase_query) or die "Timed out while waiting for subscriber to enable twophase"; -############################### # Check initial data was copied to subscriber -############################### my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -############################### -# Test 2PC PREPARE / COMMIT PREPARED. -# 1. Data is streamed as a 2PC transaction. -# 2. Then do commit prepared. -# -# Expect all data is replicated on subscriber side after the commit. -############################### - -# check that 2PC gets replicated to subscriber -# Insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# 2PC transaction gets committed -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); +test_streaming($node_publisher, $node_subscriber, $appname, 0); -# check that transaction is committed on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(4|4|4), - 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber'); - -############################### -# Test 2PC PREPARE / ROLLBACK PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. Do rollback prepared. -# -# Expect data rolls back leaving only the original 2 rows. -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# 2PC transaction gets aborted -$node_publisher->safe_psql('postgres', - "ROLLBACK PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is aborted on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(2|2|2), - 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is aborted on subscriber'); - -############################### -# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. -# 1. insert, update and delete some rows. -# 2. Then server crashes before the 2PC transaction is committed. -# 3. After servers are restarted the pending transaction is committed. -# -# Expect all data is replicated on subscriber side after the commit. -# Note: both publisher and subscriber do crash/restart. -############################### - -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_subscriber->stop('immediate'); -$node_publisher->stop('immediate'); - -$node_publisher->start; -$node_subscriber->start; -# commit post the restart -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); -$node_publisher->wait_for_catchup($appname); - -# check inserts are visible -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(4|4|4), - 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' -); - -############################### -# Do INSERT after the PREPARE but before ROLLBACK PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. A single row INSERT is done which is after the PREPARE. -# 4. Then do a ROLLBACK PREPARED. -# -# Expect the 2PC data rolls back leaving only 3 rows on the subscriber -# (the original 2 + inserted 1). -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# Insert a different record (now we are outside of the 2PC transaction) -# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key -$node_publisher->safe_psql('postgres', - "INSERT INTO test_tab VALUES (99999, 'foobar')"); - -# 2PC transaction gets aborted -$node_publisher->safe_psql('postgres', - "ROLLBACK PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is aborted on subscriber, -# but the extra INSERT outside of the 2PC still was replicated -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3|3|3), 'check the outside insert was copied to subscriber'); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is aborted on subscriber'); - -############################### -# Do INSERT after the PREPARE but before COMMIT PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. A single row INSERT is done which is after the PREPARE. -# 4. Then do a COMMIT PREPARED. -# -# Expect 2PC data + the extra row are on the subscriber -# (the 3334 + inserted 1 = 3335). -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete some rows. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); -# Insert a different record (now we are outside of the 2PC transaction) -# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key -$node_publisher->safe_psql('postgres', - "INSERT INTO test_tab VALUES (99999, 'foobar')"); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -# 2PC transaction gets committed -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); +# We need to check DEBUG logs to ensure that the parallel apply worker has +# applied the transaction. So, bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; -$node_publisher->wait_for_catchup($appname); +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); -# check that transaction is committed on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(5|5|5), - 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults' -); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber'); +test_streaming($node_publisher, $node_subscriber, $appname, 1); ############################### # check all the cleanup |
