Report catalog_xmin separately in hot_standby_feedback
authorSimon Riggs <simon@2ndQuadrant.com>
Sat, 25 Mar 2017 14:07:27 +0000 (14:07 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Sat, 25 Mar 2017 14:07:27 +0000 (14:07 +0000)
If the upstream walsender is using a physical replication slot, store the
catalog_xmin in the slot's catalog_xmin field. If the upstream doesn't use a
slot and has only a PGPROC entry behaviour doesn't change, as we store the
combined xmin and catalog_xmin in the PGPROC entry.

Author: Craig Ringer

doc/src/sgml/protocol.sgml
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/backend/storage/ipc/procarray.c
src/include/storage/proc.h
src/include/storage/procarray.h
src/test/recovery/t/010_logical_decoding_timelines.pl

index 48ca4140312e2e66524a7863408bc9e53cd06bb1..b3a50261c333206901364fdce25acb12a8e6e565 100644 (file)
@@ -1916,10 +1916,11 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current xmin. This may be 0, if the standby is
-          sending notification that Hot Standby feedback will no longer
-          be sent on this connection. Later non-zero messages may
-          reinitiate the feedback mechanism.
+          The standby's current global xmin, excluding the catalog_xmin from any
+          replication slots. If both this value and the following
+          catalog_xmin are 0 this is treated as a notification that Hot Standby
+          feedback will no longer be sent on this connection. Later non-zero
+          messages may reinitiate the feedback mechanism.
       </para>
       </listitem>
       </varlistentry>
@@ -1929,7 +1930,29 @@ The commands accepted in walsender mode are:
       </term>
       <listitem>
       <para>
-          The standby's current epoch.
+          The epoch of the global xmin xid on the standby.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The lowest catalog_xmin of any replication slots on the standby. Set to 0
+          if no catalog_xmin exists on the standby or if hot standby feedback is being
+          disabled.
+      </para>
+      </listitem>
+      </varlistentry>
+      <varlistentry>
+      <term>
+          Int32
+      </term>
+      <listitem>
+      <para>
+          The epoch of the catalog_xmin xid on the standby.
       </para>
       </listitem>
       </varlistentry>
index 31c567b37ef4f9584ec72b00b8012227a141050e..771ac305c348221ae6accc2a6467c6a28b62e79f 100644 (file)
@@ -1175,8 +1175,8 @@ XLogWalRcvSendHSFeedback(bool immed)
 {
        TimestampTz now;
        TransactionId nextXid;
-       uint32          nextEpoch;
-       TransactionId xmin;
+       uint32          xmin_epoch, catalog_xmin_epoch;
+       TransactionId xmin, catalog_xmin;
        static TimestampTz sendTime = 0;
        /* initially true so we always send at least one feedback message */
        static bool master_has_standby_xmin = true;
@@ -1221,29 +1221,54 @@ XLogWalRcvSendHSFeedback(bool immed)
         * everything else has been checked.
         */
        if (hot_standby_feedback)
-               xmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT);
+       {
+               TransactionId slot_xmin;
+
+               /*
+                * Usually GetOldestXmin() would include both global replication slot
+                * xmin and catalog_xmin in its calculations, but we want to derive
+                * separate values for each of those. So we ask for an xmin that
+                * excludes the catalog_xmin.
+                */
+               xmin = GetOldestXmin(NULL,
+                                                        PROCARRAY_FLAGS_DEFAULT|PROCARRAY_SLOTS_XMIN);
+
+               ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
+
+               if (TransactionIdIsValid(slot_xmin) &&
+                       TransactionIdPrecedes(slot_xmin, xmin))
+                       xmin = slot_xmin;
+       }
        else
+       {
                xmin = InvalidTransactionId;
+               catalog_xmin = InvalidTransactionId;
+       }
 
        /*
         * Get epoch and adjust if nextXid and oldestXmin are different sides of
         * the epoch boundary.
         */
-       GetNextXidAndEpoch(&nextXid, &nextEpoch);
+       GetNextXidAndEpoch(&nextXid, &xmin_epoch);
+       catalog_xmin_epoch = xmin_epoch;
        if (nextXid < xmin)
-               nextEpoch--;
+               xmin_epoch --;
+       if (nextXid < catalog_xmin)
+               catalog_xmin_epoch --;
 
-       elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
-                xmin, nextEpoch);
+       elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
+                xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
 
        /* Construct the message and send it. */
        resetStringInfo(&reply_message);
        pq_sendbyte(&reply_message, 'h');
        pq_sendint64(&reply_message, GetCurrentTimestamp());
        pq_sendint(&reply_message, xmin, 4);
-       pq_sendint(&reply_message, nextEpoch, 4);
+       pq_sendint(&reply_message, xmin_epoch, 4);
+       pq_sendint(&reply_message, catalog_xmin, 4);
+       pq_sendint(&reply_message, catalog_xmin_epoch, 4);
        walrcv_send(wrconn, reply_message.data, reply_message.len);
-       if (TransactionIdIsValid(xmin))
+       if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
                master_has_standby_xmin = true;
        else
                master_has_standby_xmin = false;
index a29d0e7cf4b1aabab640bc0b2d358e360b28b278..59ae22df8c8d72305df977b0239774b1589d2599 100644 (file)
@@ -242,6 +242,7 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
+static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -1756,7 +1757,7 @@ ProcessStandbyReplyMessage(void)
 
 /* compute new replication slot xmin horizon if needed */
 static void
-PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
+PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 {
        bool            changed = false;
        ReplicationSlot *slot = MyReplicationSlot;
@@ -1777,6 +1778,14 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
                slot->data.xmin = feedbackXmin;
                slot->effective_xmin = feedbackXmin;
        }
+       if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
+               !TransactionIdIsNormal(feedbackCatalogXmin) ||
+               TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
+       {
+               changed = true;
+               slot->data.catalog_xmin = feedbackCatalogXmin;
+               slot->effective_catalog_xmin = feedbackCatalogXmin;
+       }
        SpinLockRelease(&slot->mutex);
 
        if (changed)
@@ -1786,60 +1795,93 @@ PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
        }
 }
 
+/*
+ * Check that the provided xmin/epoch are sane, that is, not in the future
+ * and not so far back as to be already wrapped around.
+ *
+ * Epoch of nextXid should be same as standby, or if the counter has
+ * wrapped, then one greater than standby.
+ *
+ * This check doesn't care about whether clog exists for these xids
+ * at all.
+ */
+static bool
+TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
+{
+       TransactionId nextXid;
+       uint32          nextEpoch;
+
+       GetNextXidAndEpoch(&nextXid, &nextEpoch);
+
+       if (xid <= nextXid)
+       {
+               if (epoch != nextEpoch)
+                       return false;
+       }
+       else
+       {
+               if (epoch + 1 != nextEpoch)
+                       return false;
+       }
+
+       if (!TransactionIdPrecedesOrEquals(xid, nextXid))
+               return false;                           /* epoch OK, but it's wrapped around */
+
+       return true;
+}
+
 /*
  * Hot Standby feedback
  */
 static void
 ProcessStandbyHSFeedbackMessage(void)
 {
-       TransactionId nextXid;
-       uint32          nextEpoch;
        TransactionId feedbackXmin;
        uint32          feedbackEpoch;
+       TransactionId feedbackCatalogXmin;
+       uint32          feedbackCatalogEpoch;
 
        /*
         * Decipher the reply message. The caller already consumed the msgtype
-        * byte.
+        * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
+        * of this message.
         */
        (void) pq_getmsgint64(&reply_message);          /* sendTime; not used ATM */
        feedbackXmin = pq_getmsgint(&reply_message, 4);
        feedbackEpoch = pq_getmsgint(&reply_message, 4);
+       feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
+       feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
 
-       elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
+       elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
                 feedbackXmin,
-                feedbackEpoch);
+                feedbackEpoch,
+                feedbackCatalogXmin,
+                feedbackCatalogEpoch);
 
-       /* Unset WalSender's xmin if the feedback message value is invalid */
-       if (!TransactionIdIsNormal(feedbackXmin))
+       /*
+        * Unset WalSender's xmins if the feedback message values are invalid.
+        * This happens when the downstream turned hot_standby_feedback off.
+        */
+       if (!TransactionIdIsNormal(feedbackXmin)
+               && !TransactionIdIsNormal(feedbackCatalogXmin))
        {
                MyPgXact->xmin = InvalidTransactionId;
                if (MyReplicationSlot != NULL)
-                       PhysicalReplicationSlotNewXmin(feedbackXmin);
+                       PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
                return;
        }
 
        /*
         * Check that the provided xmin/epoch are sane, that is, not in the future
         * and not so far back as to be already wrapped around.  Ignore if not.
-        *
-        * Epoch of nextXid should be same as standby, or if the counter has
-        * wrapped, then one greater than standby.
         */
-       GetNextXidAndEpoch(&nextXid, &nextEpoch);
-
-       if (feedbackXmin <= nextXid)
-       {
-               if (feedbackEpoch != nextEpoch)
-                       return;
-       }
-       else
-       {
-               if (feedbackEpoch + 1 != nextEpoch)
-                       return;
-       }
+       if (TransactionIdIsNormal(feedbackXmin) &&
+               !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
+               return;
 
-       if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
-               return;                                 /* epoch OK, but it's wrapped around */
+       if (TransactionIdIsNormal(feedbackCatalogXmin) &&
+               !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
+               return;
 
        /*
         * Set the WalSender's xmin equal to the standby's requested xmin, so that
@@ -1864,15 +1906,23 @@ ProcessStandbyHSFeedbackMessage(void)
         * already since a VACUUM could have just finished calling GetOldestXmin.)
         *
         * If we're using a replication slot we reserve the xmin via that,
-        * otherwise via the walsender's PGXACT entry.
+        * otherwise via the walsender's PGXACT entry. We can only track the
+        * catalog xmin separately when using a slot, so we store the least
+        * of the two provided when not using a slot.
         *
         * XXX: It might make sense to generalize the ephemeral slot concept and
         * always use the slot mechanism to handle the feedback xmin.
         */
        if (MyReplicationSlot != NULL)          /* XXX: persistency configurable? */
-               PhysicalReplicationSlotNewXmin(feedbackXmin);
+               PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
        else
-               MyPgXact->xmin = feedbackXmin;
+       {
+               if (TransactionIdIsNormal(feedbackCatalogXmin)
+                       && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
+                       MyPgXact->xmin = feedbackCatalogXmin;
+               else
+                       MyPgXact->xmin = feedbackXmin;
+       }
 }
 
 /*
index 40c3247d4b8728c8cafd8b420895247cd700fe54..7c2e1e1c85421b1a82633088c99bd9080107ecbb 100644 (file)
@@ -1264,6 +1264,10 @@ TransactionIdIsActive(TransactionId xid)
  * corresponding flags is set. Typically, if you want to ignore ones with
  * PROC_IN_VACUUM flag, you can use PROCARRAY_FLAGS_VACUUM.
  *
+ * PROCARRAY_SLOTS_XMIN causes GetOldestXmin to ignore the xmin and
+ * catalog_xmin of any replication slots that exist in the system when
+ * calculating the oldest xmin.
+ *
  * This is used by VACUUM to decide which deleted tuples must be preserved in
  * the passed in table. For shared relations backends in all databases must be
  * considered, but for non-shared relations that's not required, since only
@@ -1342,7 +1346,7 @@ GetOldestXmin(Relation rel, int flags)
                volatile PGPROC *proc = &allProcs[pgprocno];
                volatile PGXACT *pgxact = &allPgXact[pgprocno];
 
-               if (pgxact->vacuumFlags & flags)
+               if (pgxact->vacuumFlags & (flags & PROCARRAY_PROC_FLAGS_MASK))
                        continue;
 
                if (allDbs ||
@@ -1418,7 +1422,8 @@ GetOldestXmin(Relation rel, int flags)
        /*
         * Check whether there are replication slots requiring an older xmin.
         */
-       if (TransactionIdIsValid(replication_slot_xmin) &&
+       if (!(flags & PROCARRAY_SLOTS_XMIN) &&
+               TransactionIdIsValid(replication_slot_xmin) &&
                NormalTransactionIdPrecedes(replication_slot_xmin, result))
                result = replication_slot_xmin;
 
@@ -1428,7 +1433,8 @@ GetOldestXmin(Relation rel, int flags)
         * possible. We need to do so if we're computing the global limit (rel =
         * NULL) or if the passed relation is a catalog relation of some kind.
         */
-       if ((rel == NULL ||
+       if (!(flags & PROCARRAY_SLOTS_XMIN) &&
+               (rel == NULL ||
                 RelationIsAccessibleInLogicalDecoding(rel)) &&
                TransactionIdIsValid(replication_slot_catalog_xmin) &&
                NormalTransactionIdPrecedes(replication_slot_catalog_xmin, result))
index 945dd1d592f27a4be8fcddf61e5f0d3e23123bfa..1b345faa2dcbba9bf62711baff878bfb722f0e98 100644 (file)
@@ -44,6 +44,10 @@ struct XidCache
  *
  * Note: If you modify these flags, you need to modify PROCARRAY_XXX flags
  * in src/include/storage/procarray.h.
+ *
+ * PROC_RESERVED may later be assigned for use in vacuumFlags, but its value is
+ * used for PROCARRAY_SLOTS_XMIN in procarray.h, so GetOldestXmin won't be able
+ * to match and ignore processes with this flag set.
  */
 #define                PROC_IS_AUTOVACUUM      0x01    /* is it an autovac worker? */
 #define                PROC_IN_VACUUM          0x02    /* currently running lazy vacuum */
@@ -51,6 +55,7 @@ struct XidCache
 #define                PROC_VACUUM_FOR_WRAPAROUND      0x08    /* set by autovac only */
 #define                PROC_IN_LOGICAL_DECODING        0x10    /* currently doing logical
                                                                                                 * decoding outside xact */
+#define                PROC_RESERVED                           0x20    /* reserved for procarray */
 
 /* flags reset at EOXact */
 #define                PROC_VACUUM_STATE_MASK \
index c8e1ae517c98e6c04ca6be5efabe4f0c250c5d94..9b42e4952433cf3980674925c03361dc2364c96a 100644 (file)
 #define                PROCARRAY_LOGICAL_DECODING_FLAG 0x10    /* currently doing logical
                                                                                                         * decoding outside xact */
 
+#define                PROCARRAY_SLOTS_XMIN                    0x20    /* replication slot xmin,
+                                                                                                        * catalog_xmin */
+/*
+ * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching
+ * PGXACT->vacuumFlags. Other flags are used for different purposes and
+ * have no corresponding PROC flag equivalent.
+ */
+#define                PROCARRAY_PROC_FLAGS_MASK       (PROCARRAY_VACUUM_FLAG | \
+                                                                                PROCARRAY_ANALYZE_FLAG | \
+                                                                                PROCARRAY_LOGICAL_DECODING_FLAG)
+
 /* Use the following flags as an input "flags" to GetOldestXmin function */
 /* Consider all backends except for logical decoding ones which manage xmin separately */
 #define                PROCARRAY_FLAGS_DEFAULT                 PROCARRAY_LOGICAL_DECODING_FLAG
index 09830dc39cefab302c12cdba4537b86a6f331e0c..4561a06143b48f4f24c27a762b4179911210bebe 100644 (file)
@@ -20,7 +20,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 7;
+use Test::More tests => 10;
 use RecursiveCopy;
 use File::Copy;
 use IPC::Run ();
@@ -31,10 +31,14 @@ my ($stdout, $stderr, $ret);
 # Initialize master node
 my $node_master = get_new_node('master');
 $node_master->init(allows_streaming => 1, has_archiving => 1);
-$node_master->append_conf('postgresql.conf', "wal_level = 'logical'\n");
-$node_master->append_conf('postgresql.conf', "max_replication_slots = 2\n");
-$node_master->append_conf('postgresql.conf', "max_wal_senders = 2\n");
-$node_master->append_conf('postgresql.conf', "log_min_messages = 'debug2'\n");
+$node_master->append_conf('postgresql.conf', q[
+wal_level = 'logical'
+max_replication_slots = 3
+max_wal_senders = 2
+log_min_messages = 'debug2'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+]);
 $node_master->dump_info;
 $node_master->start;
 
@@ -51,11 +55,17 @@ $node_master->safe_psql('postgres', 'CHECKPOINT;');
 my $backup_name = 'b1';
 $node_master->backup_fs_hot($backup_name);
 
+$node_master->safe_psql('postgres',
+       q[SELECT pg_create_physical_replication_slot('phys_slot');]);
+
 my $node_replica = get_new_node('replica');
 $node_replica->init_from_backup(
        $node_master, $backup_name,
        has_streaming => 1,
        has_restoring => 1);
+$node_replica->append_conf(
+       'recovery.conf', q[primary_slot_name = 'phys_slot']);
+
 $node_replica->start;
 
 $node_master->safe_psql('postgres',
@@ -71,6 +81,24 @@ $stdout = $node_replica->safe_psql('postgres',
 is($stdout, 'before_basebackup',
        'Expected to find only slot before_basebackup on replica');
 
+# Examine the physical slot the replica uses to stream changes
+# from the master to make sure its hot_standby_feedback
+# has locked in a catalog_xmin on the physical slot, and that
+# any xmin is < the catalog_xmin
+$node_master->poll_query_until('postgres', q[
+       SELECT catalog_xmin IS NOT NULL
+       FROM pg_replication_slots
+       WHERE slot_name = 'phys_slot'
+       ]);
+my $phys_slot = $node_master->slot('phys_slot');
+isnt($phys_slot->{'xmin'}, '',
+       'xmin assigned on physical slot of master');
+isnt($phys_slot->{'catalog_xmin'}, '',
+       'catalog_xmin assigned on physical slot of master');
+# Ignore wrap-around here, we're on a new cluster:
+cmp_ok($phys_slot->{'xmin'}, '>=', $phys_slot->{'catalog_xmin'},
+          'xmin on physical slot must not be lower than catalog_xmin');
+
 # Boom, crash
 $node_master->stop('immediate');