diff options
Diffstat (limited to 'src/test')
| -rw-r--r-- | src/test/subscription/t/020_messages.pl | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl new file mode 100644 index 00000000000..d9123ed3ef1 --- /dev/null +++ b/src/test/subscription/t/020_messages.pl @@ -0,0 +1,148 @@ +# Tests that logical decoding messages +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" +); + +# Ensure a transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +$node_publisher->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')" +); + +my $result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') +)); + +# 66 77 67 == B M C == BEGIN MESSAGE COMMIT +is($result, qq(66 +77 +67), + 'messages on slot are B M C with message option'); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') + OFFSET 1 LIMIT 1 +)); + +is($result, qq(1|pgoutput), + "flag transactional is set to 1 and prefix is pgoutput"); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub') +)); + +# 66 67 == B C == BEGIN COMMIT +is($result, qq(66 +67), + 'option messages defaults to false so message (M) is not available on slot'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); + +# ensure a non-transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)"); + +my $message_lsn = $node_publisher->safe_psql('postgres', + "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)"); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') + WHERE lsn = '$message_lsn' AND xid = 0 +)); + +is($result, qq(77|0), 'non-transactional message on slot is M'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); + +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +# wait for the replication connection to drop from the publisher +$node_publisher->poll_query_until('postgres', + 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0); + +# Ensure a non-transactional logical decoding message shows up on the slot when +# it is emitted within an aborted transaction. The message won't emit until +# something advances the LSN, hence, we intentionally forces the server to +# switch to a new WAL file. +$node_publisher->safe_psql( + 'postgres', qq( + BEGIN; + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 1'); + INSERT INTO tab_test VALUES (3); + SELECT pg_logical_emit_message(true, 'pgoutput', + 'a transactional message is not available if the transaction is aborted'); + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 2'); + ROLLBACK; + SELECT pg_switch_wal(); +)); + +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub', + 'messages', 'true') +)); + +is($result, qq(77|0 +77|0), + 'non-transactional message on slot from aborted transaction is M'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); |
