# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+ 'autovacuum = off');
$node_publisher->start;
# Create subscriber node
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
);
+$node_publisher->wait_for_catchup('tap_sub');
+
# Ensure a transactional logical decoding message shows up on the slot
$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
-# wait for the replication connection to drop from the publisher
+# wait for the replication slot to become inactive in the publisher
$node_publisher->poll_query_until('postgres',
- 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
+ "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", 1);
$node_publisher->safe_psql('postgres',
"SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
$result = $node_publisher->safe_psql(
'postgres', qq(
SELECT get_byte(data, 0)
- FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub')
));
'option messages defaults to false so message (M) is not available on slot'
);
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
-$node_publisher->wait_for_catchup('tap_sub');
-
-# ensure a non-transactional logical decoding message shows up on the slot
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
-
-# wait for the replication connection to drop from the publisher
-$node_publisher->poll_query_until('postgres',
- 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
-
$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
my $message_lsn = $node_publisher->safe_psql('postgres',
$result = $node_publisher->safe_psql(
'postgres', qq(
SELECT get_byte(data, 0), get_byte(data, 1)
- FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+ FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub',
'messages', 'true')
is($result, qq(77|0), 'non-transactional message on slot is M');
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
-$node_publisher->wait_for_catchup('tap_sub');
-
-$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
-
-# wait for the replication connection to drop from the publisher
-$node_publisher->poll_query_until('postgres',
- 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
-
# Ensure a non-transactional logical decoding message shows up on the slot when
# it is emitted within an aborted transaction. The message won't emit until
# something advances the LSN, hence, we intentionally forces the server to