Replication lag tracking for walsenders
authorSimon Riggs <simon@2ndQuadrant.com>
Thu, 23 Mar 2017 14:05:28 +0000 (14:05 +0000)
committerSimon Riggs <simon@2ndQuadrant.com>
Thu, 23 Mar 2017 14:05:28 +0000 (14:05 +0000)
Adds write_lag, flush_lag and replay_lag cols to pg_stat_replication.

Implements a lag tracker module that reports the lag times based upon
measurements of the time taken for recent WAL to be written, flushed and
replayed and for the sender to hear about it. These times
represent the commit lag that was (or would have been) introduced by each
synchronous commit level, if the remote server was configured as a
synchronous standby.  For an asynchronous standby, the replay_lag column
approximates the delay before recent transactions became visible to queries.
If the standby server has entirely caught up with the sending server and
there is no more WAL activity, the most recently measured lag times will
continue to be displayed for a short time and then show NULL.

Physical replication lag tracking is automatic. Logical replication tracking
is possible but is the responsibility of the logical decoding plugin.
Tracking is a private module operating within each walsender individually,
with values reported to shared memory. Module not used outside of walsender.

Design and code is good enough now to commit - kudos to the author.
In many ways a difficult topic, with important and subtle behaviour so this
shoudl be expected to generate discussion and multiple open items: Test now!

Author: Thomas Munro, following designs by Fujii Masao and Simon Riggs
Review: Simon Riggs, Ian Barwick and Craig Ringer

doc/src/sgml/monitoring.sgml
src/backend/access/transam/xlog.c
src/backend/catalog/system_views.sql
src/backend/replication/walsender.c
src/include/catalog/pg_proc.h
src/include/replication/logical.h
src/include/replication/walsender_private.h
src/test/regress/expected/rules.out

index eb6f486677350f14dfa7f609dfaed8a4666e11c5..356a2f0c4c4951a3f51e58553f57ba7053461975 100644 (file)
@@ -1695,6 +1695,36 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
      <entry>Last transaction log position replayed into the database on this
       standby server</entry>
     </row>
+    <row>
+     <entry><structfield>write_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Time elapsed between flushing recent WAL locally and receiving
+      notification that this standby server has written it (but not yet
+      flushed it or applied it).  This can be used to gauge the delay that
+      <literal>synchronous_commit</literal> level
+      <literal>remote_write</literal> incurred while committing if this
+      server was configured as a synchronous standby.</entry>
+    </row>
+    <row>
+     <entry><structfield>flush_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Time elapsed between flushing recent WAL locally and receiving
+      notification that this standby server has written and flushed it
+      (but not yet applied it).  This can be used to gauge the delay that
+      <literal>synchronous_commit</literal> level
+      <literal>remote_flush</literal> incurred while committing if this
+      server was configured as a synchronous standby.</entry>
+    </row>
+    <row>
+     <entry><structfield>replay_lag</></entry>
+     <entry><type>interval</></entry>
+     <entry>Time elapsed between flushing recent WAL locally and receiving
+      notification that this standby server has written, flushed and
+      applied it.  This can be used to gauge the delay that
+      <literal>synchronous_commit</literal> level
+      <literal>remote_apply</literal> incurred while committing if this
+      server was configured as a synchronous standby.</entry>
+    </row>
     <row>
      <entry><structfield>sync_priority</></entry>
      <entry><type>integer</></entry>
@@ -1745,6 +1775,45 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
    listed; no information is available about downstream standby servers.
   </para>
 
+  <para>
+   The lag times reported in the <structname>pg_stat_replication</structname>
+   view are measurements of the time taken for recent WAL to be written,
+   flushed and replayed and for the sender to know about it.  These times
+   represent the commit delay that was (or would have been) introduced by each
+   synchronous commit level, if the remote server was configured as a
+   synchronous standby.  For an asynchronous standby, the
+   <structfield>replay_lag</structfield> column approximates the delay
+   before recent transactions became visible to queries.  If the standby
+   server has entirely caught up with the sending server and there is no more
+   WAL activity, the most recently measured lag times will continue to be
+   displayed for a short time and then show NULL.
+  </para>
+
+  <para>
+   Lag times work automatically for physical replication. Logical decoding
+   plugins may optionally emit tracking messages; if they do not, the tracking
+   mechanism will simply display NULL lag.
+  </para>
+
+  <note>
+   <para>
+    The reported lag times are not predictions of how long it will take for
+    the standby to catch up with the sending server assuming the current
+    rate of replay.  Such a system would show similar times while new WAL is
+    being generated, but would differ when the sender becomes idle.  In
+    particular, when the standby has caught up completely, 
+    <structname>pg_stat_replication</structname> shows the time taken to
+    write, flush and replay the most recent reported WAL position rather than
+    zero as some users might expect.  This is consistent with the goal of
+    measuring synchronous commit and transaction visibility delays for
+    recent write transactions.
+    To reduce confusion for users expecting a different model of lag, the
+    lag columns revert to NULL after a short time on a fully replayed idle
+    system. Monitoring systems should choose whether to represent this
+    as missing data, zero or continue to display the last known value.
+   </para>
+  </note>
+
   <table id="pg-stat-wal-receiver-view" xreflabel="pg_stat_wal_receiver">
    <title><structname>pg_stat_wal_receiver</structname> View</title>
    <tgroup cols="3">
index 3924738a336d2ad6a4f31a62182ebc54e0e081e4..de1937e013d7e7b4415b0c84a90408b026b01e21 100644 (file)
@@ -11555,6 +11555,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 {
    static TimestampTz last_fail_time = 0;
    TimestampTz now;
+   bool        streaming_reply_sent = false;
 
    /*-------
     * Standby mode is implemented by a state machine:
@@ -11877,6 +11878,19 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                        break;
                    }
 
+                   /*
+                    * Since we have replayed everything we have received so
+                    * far and are about to start waiting for more WAL, let's
+                    * tell the upstream server our replay location now so
+                    * that pg_stat_replication doesn't show stale
+                    * information.
+                    */
+                   if (!streaming_reply_sent)
+                   {
+                       WalRcvForceReply();
+                       streaming_reply_sent = true;
+                   }
+
                    /*
                     * Wait for more WAL to arrive. Time out after 5 seconds
                     * to react to a trigger file promptly.
index 5723714fb972489717ff62ca74bf501966ab713b..80d14296de22bac8073bdcb00ec1832dbebf38de 100644 (file)
@@ -705,6 +705,9 @@ CREATE VIEW pg_stat_replication AS
             W.write_location,
             W.flush_location,
             W.replay_location,
+            W.write_lag,
+            W.flush_lag,
+            W.replay_lag,
             W.sync_priority,
             W.sync_state
     FROM pg_stat_get_activity(NULL) AS S
index c6ba916c49b51382c25f1d29425339aee6dbdbb4..a29d0e7cf4b1aabab640bc0b2d358e360b28b278 100644 (file)
@@ -190,6 +190,26 @@ static volatile sig_atomic_t replication_active = false;
 static LogicalDecodingContext *logical_decoding_ctx = NULL;
 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
 
+/* A sample associating a log position with the time it was written. */
+typedef struct
+{
+   XLogRecPtr lsn;
+   TimestampTz time;
+} WalTimeSample;
+
+/* The size of our buffer of time samples. */
+#define LAG_TRACKER_BUFFER_SIZE 8192
+
+/* A mechanism for tracking replication lag. */
+static struct
+{
+   XLogRecPtr last_lsn;
+   WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
+   int write_head;
+   int read_heads[NUM_SYNC_REP_WAIT_MODE];
+   WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
+} LagTracker;
+
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
@@ -221,6 +241,7 @@ static long WalSndComputeSleeptime(TimestampTz now);
 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 
 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
 
@@ -246,6 +267,9 @@ InitWalSender(void)
     */
    MarkPostmasterChildWalSender();
    SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+
+   /* Initialize empty timestamp buffer for lag tracking. */
+   memset(&LagTracker, 0, sizeof(LagTracker));
 }
 
 /*
@@ -1646,6 +1670,13 @@ ProcessStandbyReplyMessage(void)
                flushPtr,
                applyPtr;
    bool        replyRequested;
+   TimeOffset  writeLag,
+               flushLag,
+               applyLag;
+   bool        clearLagTimes;
+   TimestampTz now;
+
+   static bool fullyAppliedLastTime = false;
 
    /* the caller already consumed the msgtype byte */
    writePtr = pq_getmsgint64(&reply_message);
@@ -1660,6 +1691,30 @@ ProcessStandbyReplyMessage(void)
         (uint32) (applyPtr >> 32), (uint32) applyPtr,
         replyRequested ? " (reply requested)" : "");
 
+   /* See if we can compute the round-trip lag for these positions. */
+   now = GetCurrentTimestamp();
+   writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
+   flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
+   applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
+
+   /*
+    * If the standby reports that it has fully replayed the WAL in two
+    * consecutive reply messages, then the second such message must result
+    * from wal_receiver_status_interval expiring on the standby.  This is a
+    * convenient time to forget the lag times measured when it last
+    * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
+    * until more WAL traffic arrives.
+    */
+   clearLagTimes = false;
+   if (applyPtr == sentPtr)
+   {
+       if (fullyAppliedLastTime)
+           clearLagTimes = true;
+       fullyAppliedLastTime = true;
+   }
+   else
+       fullyAppliedLastTime = false;
+
    /* Send a reply if the standby requested one. */
    if (replyRequested)
        WalSndKeepalive(false);
@@ -1675,6 +1730,12 @@ ProcessStandbyReplyMessage(void)
        walsnd->write = writePtr;
        walsnd->flush = flushPtr;
        walsnd->apply = applyPtr;
+       if (writeLag != -1 || clearLagTimes)
+           walsnd->writeLag = writeLag;
+       if (flushLag != -1 || clearLagTimes)
+           walsnd->flushLag = flushLag;
+       if (applyLag != -1 || clearLagTimes)
+           walsnd->applyLag = applyLag;
        SpinLockRelease(&walsnd->mutex);
    }
 
@@ -2063,6 +2124,9 @@ InitWalSenderSlot(void)
            walsnd->write = InvalidXLogRecPtr;
            walsnd->flush = InvalidXLogRecPtr;
            walsnd->apply = InvalidXLogRecPtr;
+           walsnd->writeLag = -1;
+           walsnd->flushLag = -1;
+           walsnd->applyLag = -1;
            walsnd->state = WALSNDSTATE_STARTUP;
            walsnd->latch = &MyProc->procLatch;
            SpinLockRelease(&walsnd->mutex);
@@ -2389,6 +2453,32 @@ XLogSendPhysical(void)
        SendRqstPtr = GetFlushRecPtr();
    }
 
+   /*
+    * Record the current system time as an approximation of the time at which
+    * this WAL position was written for the purposes of lag tracking.
+    *
+    * In theory we could make XLogFlush() record a time in shmem whenever WAL
+    * is flushed and we could get that time as well as the LSN when we call
+    * GetFlushRecPtr() above (and likewise for the cascading standby
+    * equivalent), but rather than putting any new code into the hot WAL path
+    * it seems good enough to capture the time here.  We should reach this
+    * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
+    * may take some time, we read the WAL flush pointer and take the time
+    * very close to together here so that we'll get a later position if it
+    * is still moving.
+    *
+    * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
+    * this gives us a cheap approximation for the WAL flush time for this
+    * LSN.
+    *
+    * Note that the LSN is not necessarily the LSN for the data contained in
+    * the present message; it's the end of the the WAL, which might be
+    * further ahead.  All the lag tracking machinery cares about is finding
+    * out when that arbitrary LSN is eventually reported as written, flushed
+    * and applied, so that it can measure the elapsed time.
+    */
+   LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
+
    /*
     * If this is a historic timeline and we've reached the point where we
     * forked to the next timeline, stop streaming.
@@ -2543,6 +2633,11 @@ XLogSendLogical(void)
 
    if (record != NULL)
    {
+       /*
+        * Note the lack of any call to LagTrackerWrite() which is the responsibility
+        * of the logical decoding plugin. Response messages are handled normally,
+        * so this responsibility does not extend to needing to call LagTrackerRead().
+        */
        LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
        sentPtr = logical_decoding_ctx->reader->EndRecPtr;
@@ -2839,6 +2934,17 @@ WalSndGetStateString(WalSndState state)
    return "UNKNOWN";
 }
 
+static Interval *
+offset_to_interval(TimeOffset offset)
+{
+   Interval *result = palloc(sizeof(Interval));
+
+   result->month = 0;
+   result->day = 0;
+   result->time = offset;
+
+   return result;
+}
 
 /*
  * Returns activity of walsenders, including pids and xlog locations sent to
@@ -2847,7 +2953,7 @@ WalSndGetStateString(WalSndState state)
 Datum
 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_WAL_SENDERS_COLS   8
+#define PG_STAT_GET_WAL_SENDERS_COLS   11
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    TupleDesc   tupdesc;
    Tuplestorestate *tupstore;
@@ -2895,6 +3001,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        XLogRecPtr  write;
        XLogRecPtr  flush;
        XLogRecPtr  apply;
+       TimeOffset  writeLag;
+       TimeOffset  flushLag;
+       TimeOffset  applyLag;
        int         priority;
        WalSndState state;
        Datum       values[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -2909,6 +3018,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
        write = walsnd->write;
        flush = walsnd->flush;
        apply = walsnd->apply;
+       writeLag = walsnd->writeLag;
+       flushLag = walsnd->flushLag;
+       applyLag = walsnd->applyLag;
        priority = walsnd->sync_standby_priority;
        SpinLockRelease(&walsnd->mutex);
 
@@ -2950,7 +3062,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
             */
            priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
 
-           values[6] = Int32GetDatum(priority);
+           if (writeLag < 0)
+               nulls[6] = true;
+           else
+               values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
+
+           if (flushLag < 0)
+               nulls[7] = true;
+           else
+               values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
+
+           if (applyLag < 0)
+               nulls[8] = true;
+           else
+               values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
+
+           values[9] = Int32GetDatum(priority);
 
            /*
             * More easily understood version of standby state. This is purely
@@ -2964,12 +3091,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
             * states. We report just "quorum" for them.
             */
            if (priority == 0)
-               values[7] = CStringGetTextDatum("async");
+               values[10] = CStringGetTextDatum("async");
            else if (list_member_int(sync_standbys, i))
-               values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
+               values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
                    CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
            else
-               values[7] = CStringGetTextDatum("potential");
+               values[10] = CStringGetTextDatum("potential");
        }
 
        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -3037,3 +3164,143 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
            WalSndShutdown();
    }
 }
+
+/*
+ * Record the end of the WAL and the time it was flushed locally, so that
+ * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
+ * eventually reported to have been written, flushed and applied by the
+ * standby in a reply message.
+ * Exported to allow logical decoding plugins to call this when they choose.
+ */
+void
+LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
+{
+   bool buffer_full;
+   int new_write_head;
+   int i;
+
+   if (!am_walsender)
+       return;
+
+   /*
+    * If the lsn hasn't advanced since last time, then do nothing.  This way
+    * we only record a new sample when new WAL has been written.
+    */
+   if (LagTracker.last_lsn == lsn)
+       return;
+   LagTracker.last_lsn = lsn;
+
+   /*
+    * If advancing the write head of the circular buffer would crash into any
+    * of the read heads, then the buffer is full.  In other words, the
+    * slowest reader (presumably apply) is the one that controls the release
+    * of space.
+    */
+   new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
+   buffer_full = false;
+   for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
+   {
+       if (new_write_head == LagTracker.read_heads[i])
+           buffer_full = true;
+   }
+
+   /*
+    * If the buffer is full, for now we just rewind by one slot and overwrite
+    * the last sample, as a simple (if somewhat uneven) way to lower the
+    * sampling rate.  There may be better adaptive compaction algorithms.
+    */
+   if (buffer_full)
+   {
+       new_write_head = LagTracker.write_head;
+       if (LagTracker.write_head > 0)
+           LagTracker.write_head--;
+       else
+           LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
+   }
+
+   /* Store a sample at the current write head position. */
+   LagTracker.buffer[LagTracker.write_head].lsn = lsn;
+   LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
+   LagTracker.write_head = new_write_head;
+}
+
+/*
+ * Find out how much time has elapsed between the moment WAL position 'lsn'
+ * (or the highest known earlier LSN) was flushed locally and the time 'now'.
+ * We have a separate read head for each of the reported LSN locations we
+ * receive in replies from standby; 'head' controls which read head is
+ * used.  Whenever a read head crosses an LSN which was written into the
+ * lag buffer with LagTrackerWrite, we can use the associated timestamp to
+ * find out the time this LSN (or an earlier one) was flushed locally, and
+ * therefore compute the lag.
+ *
+ * Return -1 if no new sample data is available, and otherwise the elapsed
+ * time in microseconds.
+ */
+static TimeOffset
+LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
+{
+   TimestampTz time = 0;
+
+   /* Read all unread samples up to this LSN or end of buffer. */
+   while (LagTracker.read_heads[head] != LagTracker.write_head &&
+          LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
+   {
+       time = LagTracker.buffer[LagTracker.read_heads[head]].time;
+       LagTracker.last_read[head] =
+           LagTracker.buffer[LagTracker.read_heads[head]];
+       LagTracker.read_heads[head] =
+           (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
+   }
+
+   if (time > now)
+   {
+       /* If the clock somehow went backwards, treat as not found. */
+       return -1;
+   }
+   else if (time == 0)
+   {
+       /*
+        * We didn't cross a time.  If there is a future sample that we
+        * haven't reached yet, and we've already reached at least one sample,
+        * let's interpolate the local flushed time.  This is mainly useful for
+        * reporting a completely stuck apply position as having increasing
+        * lag, since otherwise we'd have to wait for it to eventually start
+        * moving again and cross one of our samples before we can show the
+        * lag increasing.
+        */
+       if (LagTracker.read_heads[head] != LagTracker.write_head &&
+           LagTracker.last_read[head].time != 0)
+       {
+           double fraction;
+           WalTimeSample prev = LagTracker.last_read[head];
+           WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
+
+           Assert(lsn >= prev.lsn);
+           Assert(prev.lsn < next.lsn);
+
+           if (prev.time > next.time)
+           {
+               /* If the clock somehow went backwards, treat as not found. */
+               return -1;
+           }
+
+           /* See how far we are between the previous and next samples. */
+           fraction =
+               (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
+
+           /* Scale the local flush time proportionally. */
+           time = (TimestampTz)
+               ((double) prev.time + (next.time - prev.time) * fraction);
+       }
+       else
+       {
+           /* Couldn't interpolate due to lack of data. */
+           return -1;
+       }
+   }
+
+   /* Return the elapsed time since local flush time in microseconds. */
+   Assert(time != 0);
+   return now - time;
+}
index 78c23e3f5d54ec539f63a6ad1bf1ca33f9270864..a5b415346b7a65b2bc39d06527e209509825e05f 100644 (file)
@@ -2804,7 +2804,7 @@ DATA(insert OID = 2022 (  pg_stat_get_activity            PGNSP PGUID 12 1 100 0 0 f f f
 DESCR("statistics: information about currently active backends");
 DATA(insert OID = 3318 (  pg_stat_get_progress_info              PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
 DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 (  pg_stat_get_wal_senders  PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 (  pg_stat_get_wal_senders  PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
 DESCR("statistics: information about currently active replication");
 DATA(insert OID = 3317 (  pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
 DESCR("statistics: information about WAL receiver");
index d10dd2c90af3410d00254626969d2272f7411c1f..7d6c88efe3479f9eda0b516c12bf73934a59ed66 100644 (file)
@@ -106,6 +106,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
                                      XLogRecPtr restart_lsn);
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
+extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
+
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
 #endif
index 5e6ccfc57b2b12ca3a7b17887bf36d796b744175..2c59056cefd2e5df7b602cfe95fd260ac822bd32 100644 (file)
@@ -47,6 +47,11 @@ typedef struct WalSnd
    XLogRecPtr  flush;
    XLogRecPtr  apply;
 
+   /* Measured lag times, or -1 for unknown/none. */
+   TimeOffset  writeLag;
+   TimeOffset  flushLag;
+   TimeOffset  applyLag;
+
    /* Protects shared variables shown above. */
    slock_t     mutex;
 
index f7c3a637b5d209f58fcc07b589a11c8ecaef3466..c4c8450b830511f144d222ea46adc8d1548d2b4c 100644 (file)
@@ -1831,10 +1831,13 @@ pg_stat_replication| SELECT s.pid,
     w.write_location,
     w.flush_location,
     w.replay_location,
+    w.write_lag,
+    w.flush_lag,
+    w.replay_lag,
     w.sync_priority,
     w.sync_state
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
-     JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) ON ((s.pid = w.pid)))
+     JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
     s.ssl,