CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
{
CheckPointRelationMap();
- CheckPointReplicationSlots();
+ CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
CheckPointSnapBuild();
CheckPointLogicalRewriteHeap();
CheckPointReplicationOrigin();
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->candidate_restart_lsn = InvalidXLogRecPtr;
+ slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
/*
* Create the slot on disk. We haven't actually marked the slot allocated
/*
* Flush all replication slots to disk.
*
- * This needn't actually be part of a checkpoint, but it's a convenient
- * location.
+ * It is convenient to flush dirty replication slots at the time of checkpoint.
+ * Additionally, in case of a shutdown checkpoint, we also identify the slots
+ * for which the confirmed_flush LSN has been updated since the last time it
+ * was saved and flush them.
*/
void
-CheckPointReplicationSlots(void)
+CheckPointReplicationSlots(bool is_shutdown)
{
int i;
/* save the slot to disk, locking is handled in SaveSlotToPath() */
sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
+
+ /*
+ * Slot's data is not flushed each time the confirmed_flush LSN is
+ * updated as that could lead to frequent writes. However, we decide
+ * to force a flush of all logical slot's data at the time of shutdown
+ * if the confirmed_flush LSN is changed since we last flushed it to
+ * disk. This helps in avoiding an unnecessary retreat of the
+ * confirmed_flush LSN after restart.
+ */
+ if (is_shutdown && SlotIsLogical(s))
+ {
+ SpinLockAcquire(&s->mutex);
+
+ Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush);
+
+ if (s->data.invalidated == RS_INVAL_NONE &&
+ s->data.confirmed_flush != s->last_saved_confirmed_flush)
+ {
+ s->just_dirtied = true;
+ s->dirty = true;
+ }
+ SpinLockRelease(&s->mutex);
+ }
+
SaveSlotToPath(s, path, LOG);
}
LWLockRelease(ReplicationSlotAllocationLock);
/*
* Successfully wrote, unset dirty bit, unless somebody dirtied again
- * already.
+ * already and remember the confirmed_flush LSN value.
*/
SpinLockAcquire(&slot->mutex);
if (!slot->just_dirtied)
slot->dirty = false;
+ slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
SpinLockRelease(&slot->mutex);
LWLockRelease(&slot->io_in_progress_lock);
/* initialize in memory state */
slot->effective_xmin = cp.slotdata.xmin;
slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
+ slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
slot->candidate_catalog_xmin = InvalidTransactionId;
slot->candidate_xmin_lsn = InvalidXLogRecPtr;
XLogRecPtr candidate_xmin_lsn;
XLogRecPtr candidate_restart_valid;
XLogRecPtr candidate_restart_lsn;
+
+ /*
+ * This value tracks the last confirmed_flush LSN flushed which is used
+ * during a shutdown checkpoint to decide if logical's slot data should be
+ * forcibly flushed or not.
+ */
+ XLogRecPtr last_saved_confirmed_flush;
} ReplicationSlot;
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
extern void StartupReplicationSlots(void);
-extern void CheckPointReplicationSlots(void);
+extern void CheckPointReplicationSlots(bool is_shutdown);
extern void CheckSlotRequirements(void);
extern void CheckSlotPermissions(void);
't/035_standby_logical_decoding.pl',
't/036_truncated_dropped.pl',
't/037_invalid_database.pl',
+ 't/038_save_logical_slots_shutdown.pl',
],
},
}
--- /dev/null
+
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Test logical replication slots are always flushed to disk during a shutdown
+# checkpoint.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+sub compare_confirmed_flush
+{
+ my ($node, $confirmed_flush_from_log) = @_;
+
+ # Fetch Latest checkpoint location from the control file
+ my ($stdout, $stderr) =
+ run_command([ 'pg_controldata', $node->data_dir ]);
+ my @control_data = split("\n", $stdout);
+ my $latest_checkpoint = undef;
+ foreach (@control_data)
+ {
+ if ($_ =~ /^Latest checkpoint location:\s*(.*)$/mg)
+ {
+ $latest_checkpoint = $1;
+ last;
+ }
+ }
+ die "Latest checkpoint location not found in control file\n"
+ unless defined($latest_checkpoint);
+
+ # Is it same as the value read from log?
+ ok( $latest_checkpoint eq $confirmed_flush_from_log,
+ "Check that the slot's confirmed_flush LSN is the same as the latest_checkpoint location"
+ );
+
+ return;
+}
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('pub');
+$node_publisher->init(allows_streaming => 'logical');
+# Avoid checkpoint during the test, otherwise, the latest checkpoint location
+# will change.
+$node_publisher->append_conf(
+ 'postgresql.conf', q{
+checkpoint_timeout = 1h
+autovacuum = off
+});
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('sub');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create tables
+$node_publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)");
+
+# Insert some data
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tbl VALUES (generate_series(1, 5));");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION pub FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub"
+);
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'sub');
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl");
+
+is($result, qq(5), "check initial copy was done");
+
+my $offset = -s $node_publisher->logfile;
+
+# Restart the publisher to ensure that the slot will be flushed if required
+$node_publisher->restart();
+
+# Wait until the walsender creates decoding context
+$node_publisher->wait_for_log(
+ qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./,
+ $offset);
+
+# Extract confirmed_flush from the logfile
+my $log_contents = slurp_file($node_publisher->logfile, $offset);
+$log_contents =~
+ qr/Streaming transactions committing after ([A-F0-9]+\/[A-F0-9]+), reading WAL from ([A-F0-9]+\/[A-F0-9]+)./
+ or die "could not get confirmed_flush_lsn";
+
+# Ensure that the slot's confirmed_flush LSN is the same as the
+# latest_checkpoint location.
+compare_confirmed_flush($node_publisher, $1);
+
+done_testing();