</term>
<listitem>
<para>
- The standby's current xmin. This may be 0, if the standby is
- sending notification that Hot Standby feedback will no longer
- be sent on this connection. Later non-zero messages may
- reinitiate the feedback mechanism.
+ The standby's current global xmin, excluding the catalog_xmin from any
+ replication slots. If both this value and the following
+ catalog_xmin are 0 this is treated as a notification that Hot Standby
+ feedback will no longer be sent on this connection. Later non-zero
+ messages may reinitiate the feedback mechanism.
</para>
</listitem>
</varlistentry>
</term>
<listitem>
<para>
- The standby's current epoch.
+ The epoch of the global xmin xid on the standby.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Int32
+ </term>
+ <listitem>
+ <para>
+ The lowest catalog_xmin of any replication slots on the standby. Set to 0
+ if no catalog_xmin exists on the standby or if hot standby feedback is being
+ disabled.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>
+ Int32
+ </term>
+ <listitem>
+ <para>
+ The epoch of the catalog_xmin xid on the standby.
</para>
</listitem>
</varlistentry>
{
TimestampTz now;
TransactionId nextXid;
- uint32 nextEpoch;
- TransactionId xmin;
+ uint32 xmin_epoch, catalog_xmin_epoch;
+ TransactionId xmin, catalog_xmin;
static TimestampTz sendTime = 0;
/* initially true so we always send at least one feedback message */
static bool master_has_standby_xmin = true;
* everything else has been checked.
*/
if (hot_standby_feedback)
- xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT);
+ {
+ TransactionId slot_xmin;
+
+ /*
+ * Usually GetOldestXmin() would include both global replication slot
+ * xmin and catalog_xmin in its calculations, but we want to derive
+ * separate values for each of those. So we ask for an xmin that
+ * excludes the catalog_xmin.
+ */
+ xmin = GetOldestXmin(NULL,
+ PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
+
+ ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+
+ if (TransactionIdIsValid(slot_xmin) &&
+ TransactionIdPrecedes(slot_xmin, xmin))
+ xmin = slot_xmin;
+ }
else
+ {
xmin = InvalidTransactionId;
+ catalog_xmin = InvalidTransactionId;
+ }
/*
* Get epoch and adjust if nextXid and oldestXmin are different sides of
* the epoch boundary.
*/
- GetNextXidAndEpoch(&nextXid, &nextEpoch);
+ GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+ catalog_xmin_epoch = xmin_epoch;
if (nextXid < xmin)
- nextEpoch--;
+ xmin_epoch --;
+ if (nextXid < catalog_xmin)
+ catalog_xmin_epoch --;
- elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
- xmin, nextEpoch);
+ elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+ xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
/* Construct the message and send it. */
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'h');
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint(&reply_message, xmin, 4);
- pq_sendint(&reply_message, nextEpoch, 4);
+ pq_sendint(&reply_message, xmin_epoch, 4);
+ pq_sendint(&reply_message, catalog_xmin, 4);
+ pq_sendint(&reply_message, catalog_xmin_epoch, 4);
walrcv_send(wrconn, reply_message.data, reply_message.len);
- if (TransactionIdIsValid(xmin))
+ if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
master_has_standby_xmin = true;
else
master_has_standby_xmin = false;
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
/* compute new replication slot xmin horizon if needed */
static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
{
bool changed = false;
ReplicationSlot *slot = MyReplicationSlot;
slot->data.xmin = feedbackXmin;
slot->effective_xmin = feedbackXmin;
}
+ if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+ !TransactionIdIsNormal(feedbackCatalogXmin) ||
+ TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+ {
+ changed = true;
+ slot->data.catalog_xmin = feedbackCatalogXmin;
+ slot->effective_catalog_xmin = feedbackCatalogXmin;
+ }
SpinLockRelease(&slot->mutex);
if (changed)
}
}
+/*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
+ */
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+ TransactionId nextXid;
+ uint32 nextEpoch;
+
+ GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+ if (xid <= nextXid)
+ {
+ if (epoch != nextEpoch)
+ return false;
+ }
+ else
+ {
+ if (epoch + 1 != nextEpoch)
+ return false;
+ }
+
+ if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+ return false; /* epoch OK, but it's wrapped around */
+
+ return true;
+}
+
/*
* Hot Standby feedback
*/
static void
ProcessStandbyHSFeedbackMessage(void)
{
- TransactionId nextXid;
- uint32 nextEpoch;
TransactionId feedbackXmin;
uint32 feedbackEpoch;
+ TransactionId feedbackCatalogXmin;
+ uint32 feedbackCatalogEpoch;
/*
* Decipher the reply message. The caller already consumed the msgtype
- * byte.
+ * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+ * of this message.
*/
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
feedbackXmin = pq_getmsgint(&reply_message, 4);
feedbackEpoch = pq_getmsgint(&reply_message, 4);
+ feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+ feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
- elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+ elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
feedbackXmin,
- feedbackEpoch);
+ feedbackEpoch,
+ feedbackCatalogXmin,
+ feedbackCatalogEpoch);
- /* Unset WalSender's xmin if the feedback message value is invalid */
- if (!TransactionIdIsNormal(feedbackXmin))
+ /*
+ * Unset WalSender's xmins if the feedback message values are invalid.
+ * This happens when the downstream turned hot_standby_feedback off.
+ */
+ if (!TransactionIdIsNormal(feedbackXmin)
+ && !TransactionIdIsNormal(feedbackCatalogXmin))
{
MyPgXact->xmin = InvalidTransactionId;
if (MyReplicationSlot != NULL)
- PhysicalReplicationSlotNewXmin(feedbackXmin);
+ PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
return;
}
/*
* Check that the provided xmin/epoch are sane, that is, not in the future
* and not so far back as to be already wrapped around. Ignore if not.
- *
- * Epoch of nextXid should be same as standby, or if the counter has
- * wrapped, then one greater than standby.
*/
- GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
- if (feedbackXmin <= nextXid)
- {
- if (feedbackEpoch != nextEpoch)
- return;
- }
- else
- {
- if (feedbackEpoch + 1 != nextEpoch)
- return;
- }
+ if (TransactionIdIsNormal(feedbackXmin) &&
+ !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+ return;
- if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
- return; /* epoch OK, but it's wrapped around */
+ if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+ !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+ return;
/*
* Set the WalSender's xmin equal to the standby's requested xmin, so that
* already since a VACUUM could have just finished calling GetOldestXmin.)
*
* If we're using a replication slot we reserve the xmin via that,
- * otherwise via the walsender's PGXACT entry.
+ * otherwise via the walsender's PGXACT entry. We can only track the
+ * catalog xmin separately when using a slot, so we store the least
+ * of the two provided when not using a slot.
*
* XXX: It might make sense to generalize the ephemeral slot concept and
* always use the slot mechanism to handle the feedback xmin.
*/
if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
- PhysicalReplicationSlotNewXmin(feedbackXmin);
+ PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
else
- MyPgXact->xmin = feedbackXmin;
+ {
+ if (TransactionIdIsNormal(feedbackCatalogXmin)
+ && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+ MyPgXact->xmin = feedbackCatalogXmin;
+ else
+ MyPgXact->xmin = feedbackXmin;
+ }
}
/*
* corresponding flags is set. Typically, if you want to ignore ones with
* PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
*
+ * PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and
+ * catalog_xmin of any replication slots that exist in the system when
+ * calculating the oldest xmin.
+ *
* This is used by VACUUM to decide which deleted tuples must be preserved in
* the passed in table. For shared relations backends in all databases must be
* considered, but for non-shared relations that's not required, since only
volatile PGPROC *proc = &allProcs[pgprocno];
volatile PGXACT *pgxact = &allPgXact[pgprocno];
- if (pgxact->vacuumFlags & flags)
+ if (pgxact->vacuumFlags & (flags & PROCARRAY_PROC_FLAGS_MASK))
continue;
if (allDbs ||
/*
* Check whether there are replication slots requiring an older xmin.
*/
- if (TransactionIdIsValid(replication_slot_xmin) &&
+ if (!(flags & PROCARRAY_SLOTS_XMIN) &&
+ TransactionIdIsValid(replication_slot_xmin) &&
NormalTransactionIdPrecedes(replication_slot_xmin, result))
result = replication_slot_xmin;
* possible. We need to do so if we're computing the global limit (rel =
* NULL) or if the passed relation is a catalog relation of some kind.
*/
- if ((rel == NULL ||
+ if (!(flags & PROCARRAY_SLOTS_XMIN) &&
+ (rel == NULL ||
RelationIsAccessibleInLogicalDecoding(rel)) &&
TransactionIdIsValid(replication_slot_catalog_xmin) &&
NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
*
* Note: If you modify these flags, you need to modify PROCARRAY_XXX flags
* in src/include/storage/procarray.h.
+ *
+ * PROC_RESERVED may later be assigned for use in vacuumFlags, but its value is
+ * used for PROCARRAY_SLOTS_XMIN in procarray.h, so GetOldestXmin won't be able
+ * to match and ignore processes with this flag set.
*/
#define PROC_IS_AUTOVACUUM 0x01 /* is it an autovac worker? */
#define PROC_IN_VACUUM 0x02 /* currently running lazy vacuum */
#define PROC_VACUUM_FOR_WRAPAROUND 0x08 /* set by autovac only */
#define PROC_IN_LOGICAL_DECODING 0x10 /* currently doing logical
* decoding outside xact */
+#define PROC_RESERVED 0x20 /* reserved for procarray */
/* flags reset at EOXact */
#define PROC_VACUUM_STATE_MASK \
#define PROCARRAY_LOGICAL_DECODING_FLAG 0x10 /* currently doing logical
* decoding outside xact */
+#define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin,
+ * catalog_xmin */
+/*
+ * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
+ * PGXACT->vacuumFlags. Other flags are used for different purposes and
+ * have no corresponding PROC flag equivalent.
+ */
+#define PROCARRAY_PROC_FLAGS_MASK (PROCARRAY_VACUUM_FLAG | \
+ PROCARRAY_ANALYZE_FLAG | \
+ PROCARRAY_LOGICAL_DECODING_FLAG)
+
/* Use the following flags as an input "flags" to GetOldestXmin function */
/* Consider all backends except for logical decoding ones which manage xmin separately */
#define PROCARRAY_FLAGS_DEFAULT PROCARRAY_LOGICAL_DECODING_FLAG
use PostgresNode;
use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 10;
use RecursiveCopy;
use File::Copy;
use IPC::Run ();
# Initialize master node
my $node_master = get_new_node('master');
$node_master->init(allows_streaming => 1, has_archiving => 1);
-$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
-$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
-$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
-$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->append_conf('postgresql.conf', q[
+wal_level = 'logical'
+max_replication_slots = 3
+max_wal_senders = 2
+log_min_messages = 'debug2'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+]);
$node_master->dump_info;
$node_master->start;
my $backup_name = 'b1';
$node_master->backup_fs_hot($backup_name);
+$node_master->safe_psql('postgres',
+ q[SELECT pg_create_physical_replication_slot('phys_slot');]);
+
my $node_replica = get_new_node('replica');
$node_replica->init_from_backup(
$node_master, $backup_name,
has_streaming => 1,
has_restoring => 1);
+$node_replica->append_conf(
+ 'recovery.conf', q[primary_slot_name = 'phys_slot']);
+
$node_replica->start;
$node_master->safe_psql('postgres',
is($stdout, 'before_basebackup',
'Expected to find only slot before_basebackup on replica');
+# Examine the physical slot the replica uses to stream changes
+# from the master to make sure its hot_standby_feedback
+# has locked in a catalog_xmin on the physical slot, and that
+# any xmin is < the catalog_xmin
+$node_master->poll_query_until('postgres', q[
+ SELECT catalog_xmin IS NOT NULL
+ FROM pg_replication_slots
+ WHERE slot_name = 'phys_slot'
+ ]);
+my $phys_slot = $node_master->slot('phys_slot');
+isnt($phys_slot->{'xmin'}, '',
+ 'xmin assigned on physical slot of master');
+isnt($phys_slot->{'catalog_xmin'}, '',
+ 'catalog_xmin assigned on physical slot of master');
+# Ignore wrap-around here, we're on a new cluster:
+cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
+ 'xmin on physical slot must not be lower than catalog_xmin');
+
# Boom, crash
$node_master->stop('immediate');