# up standby
# Wait for the standby to receive and write all WAL.
- my $wal_received_query =
-"SELECT pg_current_wal_lsn() = write_lsn FROM pg_stat_replication WHERE application_name = 'rewind_standby';";
- $node_master->poll_query_until('postgres', $wal_received_query)
- or die "Timed out while waiting for standby to receive and write WAL";
+ $node_master->wait_for_catchup('rewind_standby', 'write');
# Now promote standby and insert some new data on master, this will put
# the master out-of-sync with the standby.
=item $node->wait_for_catchup(standby_name, mode, target_lsn)
-Wait for the node with application_name standby_name (usually from node->name)
+Wait for the node with application_name standby_name (usually from node->name,
+also works for logical subscriptions)
until its replication location in pg_stat_replication equals or passes the
upstream's WAL insert point at the time this function is called. By default
the replay_lsn is waited for, but 'mode' may be specified to wait for any of
Requires that the 'postgres' db exists and is accessible.
target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
+If omitted, pg_current_wal_lsn() is used.
This is not a test. It die()s on failure.
{
$standby_name = $standby_name->name;
}
- die 'target_lsn must be specified' unless defined($target_lsn);
+ my $lsn_expr;
+ if (defined($target_lsn))
+ {
+ $lsn_expr = "'$target_lsn'";
+ }
+ else
+ {
+ $lsn_expr = 'pg_current_wal_lsn()'
+ }
print "Waiting for replication conn "
. $standby_name . "'s "
. $mode
. $target_lsn . " on "
. $self->name . "\n";
my $query =
-qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+qq[SELECT $lsn_expr <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
$self->poll_query_until('postgres', $query)
- or die "timed out waiting for catchup, current location is "
- . ($self->safe_psql('postgres', $query) || '(unknown)');
+ or die "timed out waiting for catchup";
print "done\n";
}
my $query =
qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';];
$self->poll_query_until('postgres', $query)
- or die "timed out waiting for catchup, current location is "
- . ($self->safe_psql('postgres', $query) || '(unknown)');
+ or die "timed out waiting for catchup";
print "done\n";
}
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub, tap_pub_ins_only"
);
-# Wait for subscriber to finish initialization
-my $caughtup_query =
-"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_mixed VALUES (2, 'bar')");
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
$node_publisher->safe_psql('postgres',
"UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'");
-# Wait for subscription to catch up
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_full");
"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
$node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), min(a), max(a) FROM tab_ins");
);
$node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)");
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# note that data are different on provider and subscriber
$result = $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
);
-# Wait for subscriber to finish initialization
-my $caughtup_query =
-"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# Wait for initial sync to finish as well
my $synced_query =
(4, '"yellow horse"=>"moaned"');
));
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# Check the data on subscriber
my $result = $node_subscriber->safe_psql(
UPDATE tst_hstore SET b = '"also"=>"updated"' WHERE a = 3;
));
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# Check the data on subscriber
$result = $node_subscriber->safe_psql(
DELETE FROM tst_hstore WHERE a = 1;
));
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# Check the data on subscriber
$result = $node_subscriber->safe_psql(
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (copy_data = false)"
);
-# Wait for subscriber to finish initialization
-my $caughtup_query =
-"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk (bid) VALUES (1);");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk_ref (id, bid) VALUES (1, 1);");
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# Check data on subscriber
my $result = $node_subscriber->safe_psql('postgres',
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);");
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# FK is not enforced on subscriber
$result = $node_subscriber->safe_psql('postgres',
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);");
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# The row should be skipped on subscriber
$result = $node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
);
-# Wait for subscriber to finish initialization
-my $caughtup_query =
-"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';";
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)");
-# Wait for subscription to catch up
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM tab_rep_next");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_rep_next SELECT generate_series(1,10)");
-# Wait for subscription to catch up
-$node_publisher->poll_query_until('postgres', $caughtup_query)
- or die "Timed out while waiting for subscriber to catch up";
+$node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM tab_rep_next");
use TestLib;
use Test::More tests => 1;
-sub wait_for_caught_up
-{
- my ($node, $appname) = @_;
-
- $node->poll_query_until('postgres',
-"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
- ) or die "Timed out while waiting for subscriber to catch up";
-}
-
my $node_publisher = get_new_node('publisher');
$node_publisher->init(
allows_streaming => 'logical',
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;"
);
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
# Wait for initial sync to finish as well
my $synced_query =
$node_publisher->safe_psql('postgres',
q{INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')}); # hand-rolled UTF-8
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
is( $node_subscriber->safe_psql(
'postgres', q{SELECT a FROM test1 WHERE b = E'Mot\xf6rhead'}
use TestLib;
use Test::More tests => 2;
-sub wait_for_caught_up
-{
- my ($node, $appname) = @_;
-
- $node->poll_query_until('postgres',
-"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
- ) or die "Timed out while waiting for subscriber to catch up";
-}
-
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;"
);
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
# Wait for initial sync to finish as well
my $synced_query =
$node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two');});
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
is($node_subscriber->safe_psql('postgres', q{SELECT a, b FROM test1}),
qq(1|one
$node_subscriber->safe_psql('postgres', $ddl2);
$node_publisher->safe_psql('postgres', $ddl2);
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
$node_publisher->safe_psql('postgres', q{INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33);});
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
is($node_subscriber->safe_psql('postgres', q{SELECT a, b, c FROM test1}),
qq(1|one|0
use TestLib;
use Test::More tests => 1;
-sub wait_for_caught_up
-{
- my ($node, $appname) = @_;
-
- $node->poll_query_until('postgres',
-"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
- ) or die "Timed out while waiting for subscriber to catch up";
-}
-
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;
"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION mypub;"
);
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
$node_subscriber->safe_psql('postgres', q{
BEGIN;
use TestLib;
use Test::More tests => 3;
-sub wait_for_caught_up
-{
- my ($node, $appname) = @_;
-
- $node->poll_query_until('postgres',
-"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';"
- ) or die "Timed out while waiting for subscriber to catch up";
-}
-
# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"
);
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
# subscriber didn't change
$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(b)");
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
-wait_for_caught_up($node_publisher, $appname);
+$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");