Morph pg_replication_slots.min_safe_lsn to safe_wal_size
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 7 Jul 2020 17:08:00 +0000 (13:08 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Tue, 7 Jul 2020 17:08:00 +0000 (13:08 -0400)
The previous definition of the column was almost universally disliked,
so provide this updated definition which is more useful for monitoring
purposes: a large positive value is good, while zero or a negative value
means danger.  This should be operationally more convenient.

Backpatch to 13, where the new column to pg_replication_slots (and the
feature it represents) were added.

Author: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reported-by: Fujii Masao <masao.fujii@oss.nttdata.com>
Discussion: https://postgr.es/m/9ddfbf8c-2f67-904d-44ed-cf8bc5916228@oss.nttdata.com

doc/src/sgml/catalogs.sgml
src/backend/access/transam/xlog.c
src/backend/catalog/system_views.sql
src/backend/replication/slotfuncs.c
src/include/access/xlog_internal.h
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/test/recovery/t/019_replslot_limit.pl
src/test/regress/expected/rules.out

index 003d2783703c6cbb6b070ba7541a12e99bf54e5f..361793b337aa750905037ad09902cf3ce206062b 100644 (file)
@@ -11275,10 +11275,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>min_safe_lsn</structfield> <type>pg_lsn</type>
+       <structfield>safe_wal_size</structfield> <type>int8</type>
       </para>
       <para>
-       The minimum LSN currently available for walsenders.
+       The number of bytes that can be written to WAL such that this slot
+       is not in danger of getting in state "lost".  It is NULL for lost
+       slots, as well as if <varname>max_slot_wal_keep_size</varname>
+       is <literal>-1</literal>.
       </para></entry>
      </row>
     </tbody>
index fd93bcfaebaae5e38c51d3e22ae26ffb00923ed6..c2feb9257621685ab1af30516e5216f9f8e4e15c 100644 (file)
@@ -764,8 +764,7 @@ static ControlFileData *ControlFile = NULL;
  * Convert values of GUCs measured in megabytes to equiv. segment count.
  * Rounds down.
  */
-#define ConvertToXSegs(x, segsize) \
-   ((x) / ((segsize) / (1024 * 1024)))
+#define ConvertToXSegs(x, segsize) XLogMBVarToSegs((x), (segsize))
 
 /* The number of bytes in a WAL segment usable for WAL data. */
 static int UsableBytesInSegment;
@@ -9513,8 +9512,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
    XLogSegNo   targetSeg;      /* segid of targetLSN */
    XLogSegNo   oldestSeg;      /* actual oldest segid */
    XLogSegNo   oldestSegMaxWalSize;    /* oldest segid kept by max_wal_size */
-   XLogSegNo   oldestSlotSeg = InvalidXLogRecPtr;  /* oldest segid kept by
-                                                    * slot */
+   XLogSegNo   oldestSlotSeg;  /* oldest segid kept by slot */
    uint64      keepSegs;
 
    /*
index 5314e9348fa776217d7de9f829212894db53a0c5..73676d04cf4b82461640b2cb5092a28049680bd6 100644 (file)
@@ -879,7 +879,7 @@ CREATE VIEW pg_replication_slots AS
             L.restart_lsn,
             L.confirmed_flush_lsn,
             L.wal_status,
-            L.min_safe_lsn
+            L.safe_wal_size
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
index 88033a79b21b60ae12d2e3ab18b6bc62670b76cf..9fe147bf44ec399cd041b75c6ab6e2ccd8bf6792 100644 (file)
@@ -242,6 +242,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
    Tuplestorestate *tupstore;
    MemoryContext per_query_ctx;
    MemoryContext oldcontext;
+   XLogRecPtr  currlsn;
    int         slotno;
 
    /* check to see if caller supports us returning a tuplestore */
@@ -274,6 +275,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
    MemoryContextSwitchTo(oldcontext);
 
+   currlsn = GetXLogWriteRecPtr();
+
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (slotno = 0; slotno < max_replication_slots; slotno++)
    {
@@ -282,7 +285,6 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
        Datum       values[PG_GET_REPLICATION_SLOTS_COLS];
        bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
        WALAvailability walstate;
-       XLogSegNo   last_removed_seg;
        int         i;
 
        if (!slot->in_use)
@@ -380,6 +382,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                 * we looked.  If checkpointer signalled the process to
                 * termination, then it's definitely lost; but if a process is
                 * still alive, then "unreserved" seems more appropriate.
+                *
+                * If we do change it, save the state for safe_wal_size below.
                 */
                if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
                {
@@ -387,10 +391,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
                    SpinLockAcquire(&slot->mutex);
                    pid = slot->active_pid;
+                   slot_contents.data.restart_lsn = slot->data.restart_lsn;
                    SpinLockRelease(&slot->mutex);
                    if (pid != 0)
                    {
                        values[i++] = CStringGetTextDatum("unreserved");
+                       walstate = WALAVAIL_UNRESERVED;
                        break;
                    }
                }
@@ -398,18 +404,32 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
                break;
        }
 
-       if (max_slot_wal_keep_size_mb >= 0 &&
-           (walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
-           ((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+       /*
+        * safe_wal_size is only computed for slots that have not been lost,
+        * and only if there's a configured maximum size.
+        */
+       if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
+           nulls[i++] = true;
+       else
        {
-           XLogRecPtr  min_safe_lsn;
+           XLogSegNo   targetSeg;
+           XLogSegNo   keepSegs;
+           XLogSegNo   failSeg;
+           XLogRecPtr  failLSN;
 
-           XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
-                                   wal_segment_size, min_safe_lsn);
-           values[i++] = Int64GetDatum(min_safe_lsn);
+           XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
+
+           /* determine how many segments slots can be kept by slots ... */
+           keepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+           /* ... and override by wal_keep_segments as needed */
+           keepSegs = Max(keepSegs, wal_keep_segments);
+
+           /* if currpos reaches failLSN, we lose our segment */
+           failSeg = targetSeg + keepSegs + 1;
+           XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
+
+           values[i++] = Int64GetDatum(failLSN - currlsn);
        }
-       else
-           nulls[i++] = true;
 
        Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
index c8869d5226c0072f125530f7353b9e6c89159103..88f3d767007beab115fad687dbc53a65b872b828 100644 (file)
@@ -121,6 +121,13 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define XLByteToPrevSeg(xlrp, logSegNo, wal_segsz_bytes) \
    logSegNo = ((xlrp) - 1) / (wal_segsz_bytes)
 
+/*
+ * Convert values of GUCs measured in megabytes to equiv. segment count.
+ * Rounds down.
+ */
+#define XLogMBVarToSegs(mbvar, wal_segsz_bytes) \
+   ((mbvar) / ((wal_segsz_bytes) / (1024 * 1024)))
+
 /*
  * Is an XLogRecPtr within a particular XLOG segment?
  *
index 54518cd40ed0e387851d807840ed5c43785c3068..1b35510d46dc03a006dd24b028e2beaa6a6dbf82 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202007061
+#define CATALOG_VERSION_NO 202007071
 
 #endif
index 38295aca4831e97514fa31446c1b1b2a79985c4b..3c89c53aa28d4b3da0693aa10e9be8cb4ead8b31 100644 (file)
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_safe_lsn}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
index 7d22ae57201a31a6e9eeb3872626975ee6f3f6f4..af656c6902f632cbfb7923412bd59bef23ce723e 100644 (file)
@@ -28,7 +28,7 @@ $node_master->safe_psql('postgres',
 
 # The slot state and remain should be null before the first connection
 my $result = $node_master->safe_psql('postgres',
-   "SELECT restart_lsn IS NULL, wal_status is NULL, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
+   "SELECT restart_lsn IS NULL, wal_status is NULL, safe_wal_size is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
 is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
 
@@ -52,9 +52,9 @@ $node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
 # Stop standby
 $node_standby->stop;
 
-# Preparation done, the slot is the state "normal" now
+# Preparation done, the slot is the state "reserved" now
 $result = $node_master->safe_psql('postgres',
-   "SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
+   "SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
 is($result, "reserved|t", 'check the catching-up state');
 
@@ -64,7 +64,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 
 # The slot is always "safe" when fitting max_wal_size
 $result = $node_master->safe_psql('postgres',
-   "SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
+   "SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
 is($result, "reserved|t",
    'check that it is safe if WAL fits in max_wal_size');
@@ -74,7 +74,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 
 # The slot is always "safe" when max_slot_wal_keep_size is not set
 $result = $node_master->safe_psql('postgres',
-   "SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
+   "SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
 is($result, "reserved|t", 'check that slot is working');
 
@@ -94,9 +94,7 @@ max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
 ));
 $node_master->reload;
 
-# The slot is in safe state. The distance from the min_safe_lsn should
-# be as almost (max_slot_wal_keep_size - 1) times large as the segment
-# size
+# The slot is in safe state.
 
 $result = $node_master->safe_psql('postgres',
    "SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
@@ -110,7 +108,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 $result = $node_master->safe_psql('postgres',
    "SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
 is($result, "reserved",
-   'check that min_safe_lsn gets close to the current LSN');
+   'check that safe_wal_size gets close to the current LSN');
 
 # The standby can reconnect to master
 $node_standby->start;
@@ -152,9 +150,9 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
 # Advance WAL again without checkpoint; remain goes to 0.
 advance_wal($node_master, 1);
 
-# Slot gets into 'unreserved' state
+# Slot gets into 'unreserved' state and safe_wal_size is negative
 $result = $node_master->safe_psql('postgres',
-   "SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
+   "SELECT wal_status, safe_wal_size <= 0 FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
 is($result, "unreserved|t",
    'check that the slot state changes to "unreserved"');
@@ -186,7 +184,7 @@ ok( find_in_log(
 
 # This slot should be broken
 $result = $node_master->safe_psql('postgres',
-   "SELECT slot_name, active, restart_lsn IS NULL, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"
+   "SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size FROM pg_replication_slots WHERE slot_name = 'rep1'"
 );
 is($result, "rep1|f|t|lost|",
    'check that the slot became inactive and the state "lost" persists');
index b813e322153d27481f13d6df27a3892531e772b0..93bb2159ca892b6fb2e85b0e3831f3abf8cd5d7e 100644 (file)
@@ -1464,8 +1464,8 @@ pg_replication_slots| SELECT l.slot_name,
     l.restart_lsn,
     l.confirmed_flush_lsn,
     l.wal_status,
-    l.min_safe_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, min_safe_lsn)
+    l.safe_wal_size
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,