<entry>Last transaction log position replayed into the database on this
standby server</entry>
</row>
+ <row>
+ <entry><structfield>write_lag</></entry>
+ <entry><type>interval</></entry>
+ <entry>Time elapsed between flushing recent WAL locally and receiving
+ notification that this standby server has written it (but not yet
+ flushed it or applied it). This can be used to gauge the delay that
+ <literal>synchronous_commit</literal> level
+ <literal>remote_write</literal> incurred while committing if this
+ server was configured as a synchronous standby.</entry>
+ </row>
+ <row>
+ <entry><structfield>flush_lag</></entry>
+ <entry><type>interval</></entry>
+ <entry>Time elapsed between flushing recent WAL locally and receiving
+ notification that this standby server has written and flushed it
+ (but not yet applied it). This can be used to gauge the delay that
+ <literal>synchronous_commit</literal> level
+ <literal>remote_flush</literal> incurred while committing if this
+ server was configured as a synchronous standby.</entry>
+ </row>
+ <row>
+ <entry><structfield>replay_lag</></entry>
+ <entry><type>interval</></entry>
+ <entry>Time elapsed between flushing recent WAL locally and receiving
+ notification that this standby server has written, flushed and
+ applied it. This can be used to gauge the delay that
+ <literal>synchronous_commit</literal> level
+ <literal>remote_apply</literal> incurred while committing if this
+ server was configured as a synchronous standby.</entry>
+ </row>
<row>
<entry><structfield>sync_priority</></entry>
<entry><type>integer</></entry>
listed; no information is available about downstream standby servers.
</para>
+ <para>
+ The lag times reported in the <structname>pg_stat_replication</structname>
+ view are measurements of the time taken for recent WAL to be written,
+ flushed and replayed and for the sender to know about it. These times
+ represent the commit delay that was (or would have been) introduced by each
+ synchronous commit level, if the remote server was configured as a
+ synchronous standby. For an asynchronous standby, the
+ <structfield>replay_lag</structfield> column approximates the delay
+ before recent transactions became visible to queries. If the standby
+ server has entirely caught up with the sending server and there is no more
+ WAL activity, the most recently measured lag times will continue to be
+ displayed for a short time and then show NULL.
+ </para>
+
+ <para>
+ Lag times work automatically for physical replication. Logical decoding
+ plugins may optionally emit tracking messages; if they do not, the tracking
+ mechanism will simply display NULL lag.
+ </para>
+
+ <note>
+ <para>
+ The reported lag times are not predictions of how long it will take for
+ the standby to catch up with the sending server assuming the current
+ rate of replay. Such a system would show similar times while new WAL is
+ being generated, but would differ when the sender becomes idle. In
+ particular, when the standby has caught up completely,
+ <structname>pg_stat_replication</structname> shows the time taken to
+ write, flush and replay the most recent reported WAL position rather than
+ zero as some users might expect. This is consistent with the goal of
+ measuring synchronous commit and transaction visibility delays for
+ recent write transactions.
+ To reduce confusion for users expecting a different model of lag, the
+ lag columns revert to NULL after a short time on a fully replayed idle
+ system. Monitoring systems should choose whether to represent this
+ as missing data, zero or continue to display the last known value.
+ </para>
+ </note>
+
<table id="pg-stat-wal-receiver-view" xreflabel="pg_stat_wal_receiver">
<title><structname>pg_stat_wal_receiver</structname> View</title>
<tgroup cols="3">
static LogicalDecodingContext *logical_decoding_ctx = NULL;
static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
+/* A sample associating a log position with the time it was written. */
+typedef struct
+{
+ XLogRecPtr lsn;
+ TimestampTz time;
+} WalTimeSample;
+
+/* The size of our buffer of time samples. */
+#define LAG_TRACKER_BUFFER_SIZE 8192
+
+/* A mechanism for tracking replication lag. */
+static struct
+{
+ XLogRecPtr last_lsn;
+ WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
+ int write_head;
+ int read_heads[NUM_SYNC_REP_WAIT_MODE];
+ WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
+} LagTracker;
+
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
*/
MarkPostmasterChildWalSender();
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+
+ /* Initialize empty timestamp buffer for lag tracking. */
+ memset(&LagTracker, 0, sizeof(LagTracker));
}
/*
flushPtr,
applyPtr;
bool replyRequested;
+ TimeOffset writeLag,
+ flushLag,
+ applyLag;
+ bool clearLagTimes;
+ TimestampTz now;
+
+ static bool fullyAppliedLastTime = false;
/* the caller already consumed the msgtype byte */
writePtr = pq_getmsgint64(&reply_message);
(uint32) (applyPtr >> 32), (uint32) applyPtr,
replyRequested ? " (reply requested)" : "");
+ /* See if we can compute the round-trip lag for these positions. */
+ now = GetCurrentTimestamp();
+ writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
+ flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
+ applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
+
+ /*
+ * If the standby reports that it has fully replayed the WAL in two
+ * consecutive reply messages, then the second such message must result
+ * from wal_receiver_status_interval expiring on the standby. This is a
+ * convenient time to forget the lag times measured when it last
+ * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
+ * until more WAL traffic arrives.
+ */
+ clearLagTimes = false;
+ if (applyPtr == sentPtr)
+ {
+ if (fullyAppliedLastTime)
+ clearLagTimes = true;
+ fullyAppliedLastTime = true;
+ }
+ else
+ fullyAppliedLastTime = false;
+
/* Send a reply if the standby requested one. */
if (replyRequested)
WalSndKeepalive(false);
walsnd->write = writePtr;
walsnd->flush = flushPtr;
walsnd->apply = applyPtr;
+ if (writeLag != -1 || clearLagTimes)
+ walsnd->writeLag = writeLag;
+ if (flushLag != -1 || clearLagTimes)
+ walsnd->flushLag = flushLag;
+ if (applyLag != -1 || clearLagTimes)
+ walsnd->applyLag = applyLag;
SpinLockRelease(&walsnd->mutex);
}
walsnd->write = InvalidXLogRecPtr;
walsnd->flush = InvalidXLogRecPtr;
walsnd->apply = InvalidXLogRecPtr;
+ walsnd->writeLag = -1;
+ walsnd->flushLag = -1;
+ walsnd->applyLag = -1;
walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch;
SpinLockRelease(&walsnd->mutex);
SendRqstPtr = GetFlushRecPtr();
}
+ /*
+ * Record the current system time as an approximation of the time at which
+ * this WAL position was written for the purposes of lag tracking.
+ *
+ * In theory we could make XLogFlush() record a time in shmem whenever WAL
+ * is flushed and we could get that time as well as the LSN when we call
+ * GetFlushRecPtr() above (and likewise for the cascading standby
+ * equivalent), but rather than putting any new code into the hot WAL path
+ * it seems good enough to capture the time here. We should reach this
+ * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
+ * may take some time, we read the WAL flush pointer and take the time
+ * very close to together here so that we'll get a later position if it
+ * is still moving.
+ *
+ * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
+ * this gives us a cheap approximation for the WAL flush time for this
+ * LSN.
+ *
+ * Note that the LSN is not necessarily the LSN for the data contained in
+ * the present message; it's the end of the the WAL, which might be
+ * further ahead. All the lag tracking machinery cares about is finding
+ * out when that arbitrary LSN is eventually reported as written, flushed
+ * and applied, so that it can measure the elapsed time.
+ */
+ LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
+
/*
* If this is a historic timeline and we've reached the point where we
* forked to the next timeline, stop streaming.
if (record != NULL)
{
+ /*
+ * Note the lack of any call to LagTrackerWrite() which is the responsibility
+ * of the logical decoding plugin. Response messages are handled normally,
+ * so this responsibility does not extend to needing to call LagTrackerRead().
+ */
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
sentPtr = logical_decoding_ctx->reader->EndRecPtr;
return "UNKNOWN";
}
+static Interval *
+offset_to_interval(TimeOffset offset)
+{
+ Interval *result = palloc(sizeof(Interval));
+
+ result->month = 0;
+ result->day = 0;
+ result->time = offset;
+
+ return result;
+}
/*
* Returns activity of walsenders, including pids and xlog locations sent to
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 8
+#define PG_STAT_GET_WAL_SENDERS_COLS 11
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
+ TimeOffset writeLag;
+ TimeOffset flushLag;
+ TimeOffset applyLag;
int priority;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
+ writeLag = walsnd->writeLag;
+ flushLag = walsnd->flushLag;
+ applyLag = walsnd->applyLag;
priority = walsnd->sync_standby_priority;
SpinLockRelease(&walsnd->mutex);
*/
priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
- values[6] = Int32GetDatum(priority);
+ if (writeLag < 0)
+ nulls[6] = true;
+ else
+ values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
+
+ if (flushLag < 0)
+ nulls[7] = true;
+ else
+ values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
+
+ if (applyLag < 0)
+ nulls[8] = true;
+ else
+ values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
+
+ values[9] = Int32GetDatum(priority);
/*
* More easily understood version of standby state. This is purely
* states. We report just "quorum" for them.
*/
if (priority == 0)
- values[7] = CStringGetTextDatum("async");
+ values[10] = CStringGetTextDatum("async");
else if (list_member_int(sync_standbys, i))
- values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
+ values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else
- values[7] = CStringGetTextDatum("potential");
+ values[10] = CStringGetTextDatum("potential");
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
WalSndShutdown();
}
}
+
+/*
+ * Record the end of the WAL and the time it was flushed locally, so that
+ * LagTrackerRead can compute the elapsed time (lag) when this WAL position is
+ * eventually reported to have been written, flushed and applied by the
+ * standby in a reply message.
+ * Exported to allow logical decoding plugins to call this when they choose.
+ */
+void
+LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
+{
+ bool buffer_full;
+ int new_write_head;
+ int i;
+
+ if (!am_walsender)
+ return;
+
+ /*
+ * If the lsn hasn't advanced since last time, then do nothing. This way
+ * we only record a new sample when new WAL has been written.
+ */
+ if (LagTracker.last_lsn == lsn)
+ return;
+ LagTracker.last_lsn = lsn;
+
+ /*
+ * If advancing the write head of the circular buffer would crash into any
+ * of the read heads, then the buffer is full. In other words, the
+ * slowest reader (presumably apply) is the one that controls the release
+ * of space.
+ */
+ new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
+ buffer_full = false;
+ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
+ {
+ if (new_write_head == LagTracker.read_heads[i])
+ buffer_full = true;
+ }
+
+ /*
+ * If the buffer is full, for now we just rewind by one slot and overwrite
+ * the last sample, as a simple (if somewhat uneven) way to lower the
+ * sampling rate. There may be better adaptive compaction algorithms.
+ */
+ if (buffer_full)
+ {
+ new_write_head = LagTracker.write_head;
+ if (LagTracker.write_head > 0)
+ LagTracker.write_head--;
+ else
+ LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
+ }
+
+ /* Store a sample at the current write head position. */
+ LagTracker.buffer[LagTracker.write_head].lsn = lsn;
+ LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
+ LagTracker.write_head = new_write_head;
+}
+
+/*
+ * Find out how much time has elapsed between the moment WAL position 'lsn'
+ * (or the highest known earlier LSN) was flushed locally and the time 'now'.
+ * We have a separate read head for each of the reported LSN locations we
+ * receive in replies from standby; 'head' controls which read head is
+ * used. Whenever a read head crosses an LSN which was written into the
+ * lag buffer with LagTrackerWrite, we can use the associated timestamp to
+ * find out the time this LSN (or an earlier one) was flushed locally, and
+ * therefore compute the lag.
+ *
+ * Return -1 if no new sample data is available, and otherwise the elapsed
+ * time in microseconds.
+ */
+static TimeOffset
+LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
+{
+ TimestampTz time = 0;
+
+ /* Read all unread samples up to this LSN or end of buffer. */
+ while (LagTracker.read_heads[head] != LagTracker.write_head &&
+ LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
+ {
+ time = LagTracker.buffer[LagTracker.read_heads[head]].time;
+ LagTracker.last_read[head] =
+ LagTracker.buffer[LagTracker.read_heads[head]];
+ LagTracker.read_heads[head] =
+ (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
+ }
+
+ if (time > now)
+ {
+ /* If the clock somehow went backwards, treat as not found. */
+ return -1;
+ }
+ else if (time == 0)
+ {
+ /*
+ * We didn't cross a time. If there is a future sample that we
+ * haven't reached yet, and we've already reached at least one sample,
+ * let's interpolate the local flushed time. This is mainly useful for
+ * reporting a completely stuck apply position as having increasing
+ * lag, since otherwise we'd have to wait for it to eventually start
+ * moving again and cross one of our samples before we can show the
+ * lag increasing.
+ */
+ if (LagTracker.read_heads[head] != LagTracker.write_head &&
+ LagTracker.last_read[head].time != 0)
+ {
+ double fraction;
+ WalTimeSample prev = LagTracker.last_read[head];
+ WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
+
+ Assert(lsn >= prev.lsn);
+ Assert(prev.lsn < next.lsn);
+
+ if (prev.time > next.time)
+ {
+ /* If the clock somehow went backwards, treat as not found. */
+ return -1;
+ }
+
+ /* See how far we are between the previous and next samples. */
+ fraction =
+ (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
+
+ /* Scale the local flush time proportionally. */
+ time = (TimestampTz)
+ ((double) prev.time + (next.time - prev.time) * fraction);
+ }
+ else
+ {
+ /* Couldn't interpolate due to lack of data. */
+ return -1;
+ }
+ }
+
+ /* Return the elapsed time since local flush time in microseconds. */
+ Assert(time != 0);
+ return now - time;
+}
DESCR("statistics: information about currently active backends");
DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ ));
DESCR("statistics: information about progress of backends running maintenance command");
-DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
+DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ ));
DESCR("statistics: information about currently active replication");
DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ ));
DESCR("statistics: information about WAL receiver");