Track statistics for spilling of changes from ReorderBuffer.
authorAmit Kapila <akapila@postgresql.org>
Thu, 8 Oct 2020 03:39:08 +0000 (09:09 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 8 Oct 2020 03:39:08 +0000 (09:09 +0530)
This adds the statistics about transactions spilled to disk from
ReorderBuffer. Users can query the pg_stat_replication_slots view to check
these stats and call pg_stat_reset_replication_slot to reset the stats of
a particular slot. Users can pass NULL in pg_stat_reset_replication_slot
to reset stats of all the slots.

This commit extends the statistics collector to track this information
about slots.

Author: Sawada Masahiko and Amit Kapila
Reviewed-by: Amit Kapila and Dilip Kumar
Discussion: https://postgr.es/m/CA+fd4k5_pPAYRTDrO2PbtTOe0eHQpBvuqmCr8ic39uTNmR49Eg@mail.gmail.com

16 files changed:
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/postmaster/pgstat.c
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/slot.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/pgstat.h
src/include/replication/logical.h
src/include/replication/reorderbuffer.h
src/include/replication/slot.h
src/test/regress/expected/rules.out
src/tools/pgindent/typedefs.list

index 171ba7049c7bdb60a54c8839fd2025f30b570f6d..66566765f0c9117edb1b4ca9fe78d3785ba0de76 100644 (file)
@@ -314,6 +314,15 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_replication_slots</structname><indexterm><primary>pg_stat_replication_slots</primary></indexterm></entry>
+      <entry>One row per replication slot, showing statistics about
+       replication slot usage.
+       See <link linkend="monitoring-pg-stat-replication-slots-view">
+       <structname>pg_stat_replication_slots</structname></link> for details.
+      </entry>
+     </row>
+
      <row>
       <entry><structname>pg_stat_wal_receiver</structname><indexterm><primary>pg_stat_wal_receiver</primary></indexterm></entry>
       <entry>Only one row, showing statistics about the WAL receiver from
@@ -2552,6 +2561,88 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
  </sect2>
 
+ <sect2 id="monitoring-pg-stat-replication-slots-view">
+  <title><structname>pg_stat_replication_slots</structname></title>
+
+  <indexterm>
+   <primary>pg_stat_replication_slots</primary>
+  </indexterm>
+
+  <para>
+   The <structname>pg_stat_replication_slots</structname> view will contain
+   one row per logical replication slot, showing statistics about its usage.
+  </para>
+
+  <table id="pg-stat-replication-slots-view" xreflabel="pg_stat_replication_slots">
+   <title><structname>pg_stat_replication_slots</structname> View</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        Column Type
+       </para>
+       <para>
+        Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>name</structfield> <type>text</type>
+       </para>
+       <para>
+        A unique, cluster-wide identifier for the replication slot
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>spill_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of transactions spilled to disk after the memory used by
+        logical decoding exceeds <literal>logical_decoding_work_mem</literal>. The
+        counter gets incremented both for toplevel transactions and
+        subtransactions.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>spill_count</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of times transactions were spilled to disk. Transactions
+        may get spilled repeatedly, and this counter gets incremented on every
+        such invocation.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>spill_bytes</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded transaction data spilled to disk.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
+       </para>
+       <para>
+        Time at which these statistics were last reset
+       </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+
+ </sect2>
+
  <sect2 id="monitoring-pg-stat-wal-receiver-view">
   <title><structname>pg_stat_wal_receiver</structname></title>
 
@@ -4802,6 +4893,27 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
         can be granted EXECUTE to run the function.
        </para></entry>
       </row>
+
+      <row>
+        <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+          <primary>pg_stat_reset_replication_slot</primary>
+        </indexterm>
+        <function>pg_stat_reset_replication_slot</function> ( <type>text</type> )
+        <returnvalue>void</returnvalue>
+       </para>
+       <para>
+         Resets statistics to zero for a single replication slot, or for all
+         replication slots in the cluster.  The argument can be either the name
+         of the slot to reset the stats or NULL.  If the argument is NULL, all
+         counters shown in the <structname>pg_stat_replication_slots</structname>
+         view for all replication slots are reset.
+       </para>
+       <para>
+         This function is restricted to superusers by default, but other users
+         can be granted EXECUTE to run the function.
+       </para></entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
index 923c2e2be1f60d76f985c273f75672bbe19c1f17..c29390760fe4b90cae0f9b4ae8c46b9eeb866fd3 100644 (file)
@@ -796,6 +796,15 @@ CREATE VIEW pg_stat_replication AS
         JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
+CREATE VIEW pg_stat_replication_slots AS
+    SELECT
+            s.name,
+            s.spill_txns,
+            s.spill_count,
+            s.spill_bytes,
+            s.stats_reset
+    FROM pg_stat_get_replication_slots() AS s;
+
 CREATE VIEW pg_stat_slru AS
     SELECT
             s.name,
@@ -1453,6 +1462,7 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_shared(text) FROM public;
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_slru(text) FROM public;
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_table_counters(oid) FROM public;
 REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
 
 REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
 REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
index 5294c7854942ae2f71c39fdc1e93ee6088206bd0..822f0ebc6285a70c8bef1efb301a58b152c08b64 100644 (file)
@@ -51,6 +51,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
+#include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/backendid.h"
 #include "storage/dsm.h"
@@ -284,6 +285,8 @@ static PgStat_ArchiverStats archiverStats;
 static PgStat_GlobalStats globalStats;
 static PgStat_WalStats walStats;
 static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
+static PgStat_ReplSlotStats *replSlotStats;
+static int     nReplSlotStats;
 
 /*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
@@ -324,6 +327,9 @@ static void pgstat_read_current_status(void);
 static bool pgstat_write_statsfile_needed(void);
 static bool pgstat_db_requested(Oid databaseid);
 
+static int     pgstat_replslot_index(const char *name, bool create_it);
+static void pgstat_reset_replslot(int i, TimestampTz ts);
+
 static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
 static void pgstat_send_funcstats(void);
 static void pgstat_send_slru(void);
@@ -350,6 +356,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
 static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
 static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
 static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
+static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
 static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
 static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
 static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
@@ -362,6 +369,7 @@ static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
 static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
 static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
 static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);
+static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 
 /* ------------------------------------------------------------
@@ -1437,6 +1445,61 @@ pgstat_reset_slru_counter(const char *name)
        pgstat_send(&msg, sizeof(msg));
 }
 
+/* ----------
+ * pgstat_reset_replslot_counter() -
+ *
+ *     Tell the statistics collector to reset a single replication slot
+ *     counter, or all replication slots counters (when name is null).
+ *
+ *     Permission checking for this function is managed through the normal
+ *     GRANT system.
+ * ----------
+ */
+void
+pgstat_reset_replslot_counter(const char *name)
+{
+       PgStat_MsgResetreplslotcounter msg;
+
+       if (pgStatSock == PGINVALID_SOCKET)
+               return;
+
+       if (name)
+       {
+               ReplicationSlot *slot;
+
+               /*
+                * Check if the slot exits with the given name. It is possible that by
+                * the time this message is executed the slot is dropped but at least
+                * this check will ensure that the given name is for a valid slot.
+                */
+               LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+               slot = SearchNamedReplicationSlot(name);
+               LWLockRelease(ReplicationSlotControlLock);
+
+               if (!slot)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("replication slot \"%s\" does not exist",
+                                                       name)));
+
+               /*
+                * Nothing to do for physical slots as we collect stats only for
+                * logical slots.
+                */
+               if (SlotIsPhysical(slot))
+                       return;
+
+               memcpy(&msg.m_slotname, name, NAMEDATALEN);
+               msg.clearall = false;
+       }
+       else
+               msg.clearall = true;
+
+       pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
+
+       pgstat_send(&msg, sizeof(msg));
+}
+
 /* ----------
  * pgstat_report_autovac() -
  *
@@ -1637,6 +1700,46 @@ pgstat_report_tempfile(size_t filesize)
        pgstat_send(&msg, sizeof(msg));
 }
 
+/* ----------
+ * pgstat_report_replslot() -
+ *
+ *     Tell the collector about replication slot statistics.
+ * ----------
+ */
+void
+pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+                                          int spillbytes)
+{
+       PgStat_MsgReplSlot msg;
+
+       /*
+        * Prepare and send the message
+        */
+       pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+       memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+       msg.m_drop = false;
+       msg.m_spill_txns = spilltxns;
+       msg.m_spill_count = spillcount;
+       msg.m_spill_bytes = spillbytes;
+       pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
+/* ----------
+ * pgstat_report_replslot_drop() -
+ *
+ *     Tell the collector about dropping the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_drop(const char *slotname)
+{
+       PgStat_MsgReplSlot msg;
+
+       pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+       memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+       msg.m_drop = true;
+       pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
 
 /* ----------
  * pgstat_ping() -
@@ -2714,6 +2817,23 @@ pgstat_fetch_slru(void)
        return slruStats;
 }
 
+/*
+ * ---------
+ * pgstat_fetch_replslot() -
+ *
+ *     Support function for the SQL-callable pgstat* functions. Returns
+ *     a pointer to the replication slot statistics struct and sets the
+ *     number of entries in nslots_p.
+ * ---------
+ */
+PgStat_ReplSlotStats *
+pgstat_fetch_replslot(int *nslots_p)
+{
+       backend_read_statsfile();
+
+       *nslots_p = nReplSlotStats;
+       return replSlotStats;
+}
 
 /* ------------------------------------------------------------
  * Functions for management of the shared-memory PgBackendStatus array
@@ -4693,6 +4813,11 @@ PgstatCollectorMain(int argc, char *argv[])
                                                                                                 len);
                                        break;
 
+                               case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
+                                       pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
+                                                                                                        len);
+                                       break;
+
                                case PGSTAT_MTYPE_AUTOVAC_START:
                                        pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
                                        break;
@@ -4747,6 +4872,10 @@ PgstatCollectorMain(int argc, char *argv[])
                                                                                                 len);
                                        break;
 
+                               case PGSTAT_MTYPE_REPLSLOT:
+                                       pgstat_recv_replslot(&msg.msg_replslot, len);
+                                       break;
+
                                default:
                                        break;
                        }
@@ -4946,6 +5075,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
        const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
        const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
        int                     rc;
+       int                     i;
 
        elog(DEBUG2, "writing stats file \"%s\"", statfile);
 
@@ -5025,6 +5155,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
                (void) rc;                              /* we'll check for error with ferror */
        }
 
+       /*
+        * Write replication slot stats struct
+        */
+       for (i = 0; i < nReplSlotStats; i++)
+       {
+               fputc('R', fpout);
+               rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
+               (void) rc;                              /* we'll check for error with ferror */
+       }
+
        /*
         * No more output to be done. Close the temp file and replace the old
         * pgstat.stat with it.  The ferror() check replaces testing for error
@@ -5250,6 +5390,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
        dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
                                                 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+       /* Allocate the space for replication slot statistics */
+       replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats));
+       nReplSlotStats = 0;
+
        /*
         * Clear out global, archiver, WAL and SLRU statistics so they start from
         * zero in case we can't load an existing statsfile.
@@ -5273,6 +5417,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
        for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
                slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
 
+       /*
+        * Set the same reset timestamp for all replication slots too.
+        */
+       for (i = 0; i < max_replication_slots; i++)
+               replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
+
        /*
         * Try to open the stats file. If it doesn't exist, the backends simply
         * return zero for anything and the collector simply starts from scratch
@@ -5447,6 +5597,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 
                                break;
 
+                               /*
+                                * 'R'  A PgStat_ReplSlotStats struct describing a replication
+                                * slot follows.
+                                */
+                       case 'R':
+                               if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
+                                       != sizeof(PgStat_ReplSlotStats))
+                               {
+                                       ereport(pgStatRunningInCollector ? LOG : WARNING,
+                                                       (errmsg("corrupted statistics file \"%s\"",
+                                                                       statfile)));
+                                       memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+                                       goto done;
+                               }
+                               nReplSlotStats++;
+                               break;
+
                        case 'E':
                                goto done;
 
@@ -5658,6 +5825,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
        PgStat_ArchiverStats myArchiverStats;
        PgStat_WalStats myWalStats;
        PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
+       PgStat_ReplSlotStats myReplSlotStats;
        FILE       *fpin;
        int32           format_id;
        const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -5772,6 +5940,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
 
                                break;
 
+                               /*
+                                * 'R'  A PgStat_ReplSlotStats struct describing a replication
+                                * slot follows.
+                                */
+                       case 'R':
+                               if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
+                                       != sizeof(PgStat_ReplSlotStats))
+                               {
+                                       ereport(pgStatRunningInCollector ? LOG : WARNING,
+                                                       (errmsg("corrupted statistics file \"%s\"",
+                                                                       statfile)));
+                                       FreeFile(fpin);
+                                       return false;
+                               }
+                               break;
+
                        case 'E':
                                goto done;
 
@@ -6367,6 +6551,46 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len)
        }
 }
 
+/* ----------
+ * pgstat_recv_resetreplslotcounter() -
+ *
+ *     Reset some replication slot statistics of the cluster.
+ * ----------
+ */
+static void
+pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
+                                                                int len)
+{
+       int                     i;
+       int                     idx = -1;
+       TimestampTz ts;
+
+       ts = GetCurrentTimestamp();
+       if (msg->clearall)
+       {
+               for (i = 0; i < nReplSlotStats; i++)
+                       pgstat_reset_replslot(i, ts);
+       }
+       else
+       {
+               /* Get the index of replication slot statistics to reset */
+               idx = pgstat_replslot_index(msg->m_slotname, false);
+
+               /*
+                * Nothing to do if the given slot entry is not found.  This could
+                * happen when the slot with the given name is removed and the
+                * corresponding statistics entry is also removed before receiving the
+                * reset message.
+                */
+               if (idx < 0)
+                       return;
+
+               /* Reset the stats for the requested replication slot */
+               pgstat_reset_replslot(idx, ts);
+       }
+}
+
+
 /* ----------
  * pgstat_recv_autovac() -
  *
@@ -6626,6 +6850,51 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
        dbentry->last_checksum_failure = msg->m_failure_time;
 }
 
+/* ----------
+ * pgstat_recv_replslot() -
+ *
+ *     Process a REPLSLOT message.
+ * ----------
+ */
+static void
+pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
+{
+       int                     idx;
+
+       /*
+        * Get the index of replication slot statistics.  On dropping, we don't
+        * create the new statistics.
+        */
+       idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
+
+       /*
+        * The slot entry is not found or there is no space to accommodate the new
+        * entry.  This could happen when the message for the creation of a slot
+        * reached before the drop message even though the actual operations
+        * happen in reverse order.  In such a case, the next update of the
+        * statistics for the same slot will create the required entry.
+        */
+       if (idx < 0)
+               return;
+
+       Assert(idx >= 0 && idx <= max_replication_slots);
+       if (msg->m_drop)
+       {
+               /* Remove the replication slot statistics with the given name */
+               memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
+                          sizeof(PgStat_ReplSlotStats));
+               nReplSlotStats--;
+               Assert(nReplSlotStats >= 0);
+       }
+       else
+       {
+               /* Update the replication slot statistics */
+               replSlotStats[idx].spill_txns += msg->m_spill_txns;
+               replSlotStats[idx].spill_count += msg->m_spill_count;
+               replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+       }
+}
+
 /* ----------
  * pgstat_recv_tempfile() -
  *
@@ -6808,6 +7077,57 @@ pgstat_clip_activity(const char *raw_activity)
        return activity;
 }
 
+/* ----------
+ * pgstat_replslot_index
+ *
+ * Return the index of entry of a replication slot with the given name, or
+ * -1 if the slot is not found.
+ *
+ * create_it tells whether to create the new slot entry if it is not found.
+ * ----------
+ */
+static int
+pgstat_replslot_index(const char *name, bool create_it)
+{
+       int                     i;
+
+       Assert(nReplSlotStats <= max_replication_slots);
+       for (i = 0; i < nReplSlotStats; i++)
+       {
+               if (strcmp(replSlotStats[i].slotname, name) == 0)
+                       return i;                       /* found */
+       }
+
+       /*
+        * The slot is not found.  We don't want to register the new statistics if
+        * the list is already full or the caller didn't request.
+        */
+       if (i == max_replication_slots || !create_it)
+               return -1;
+
+       /* Register new slot */
+       memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+       memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+
+       return nReplSlotStats++;
+}
+
+/* ----------
+ * pgstat_reset_replslot
+ *
+ * Reset the replication slot stats at index 'i'.
+ * ----------
+ */
+static void
+pgstat_reset_replslot(int i, TimestampTz ts)
+{
+       /* reset only counters. Don't clear slot name */
+       replSlotStats[i].spill_txns = 0;
+       replSlotStats[i].spill_count = 0;
+       replSlotStats[i].spill_bytes = 0;
+       replSlotStats[i].stat_reset_timestamp = ts;
+}
+
 /*
  * pgstat_slru_index
  *
index f21f61d5e10b052d9c25a0d75055a557d52d309c..3f84ee99b86331a142dfdba5919743cd2da0b42f 100644 (file)
@@ -650,6 +650,12 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
        /* replay actions of all transaction + subtransactions in order */
        ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
                                                commit_time, origin_id, origin_lsn);
+
+       /*
+        * Update the decoding stats at transaction commit/abort. It is not clear
+        * that sending more or less frequently than this would be better.
+        */
+       UpdateDecodingStats(ctx);
 }
 
 /*
@@ -669,6 +675,9 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
        }
 
        ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+
+       /* update the decoding stats */
+       UpdateDecodingStats(ctx);
 }
 
 /*
index 0f6af952f93940032714fb957aaadccef2e91cb4..3346df32d3b4f15384b86d404b0abdd47c4061d8 100644 (file)
@@ -32,6 +32,7 @@
 #include "access/xlog_internal.h"
 #include "fmgr.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
@@ -1460,3 +1461,31 @@ ResetLogicalStreamingState(void)
        CheckXidAlive = InvalidTransactionId;
        bsysscan = false;
 }
+
+/*
+ * Report stats for a slot.
+ */
+void
+UpdateDecodingStats(LogicalDecodingContext *ctx)
+{
+       ReorderBuffer *rb = ctx->reorder;
+
+       /*
+        * Nothing to do if we haven't spilled anything since the last time the
+        * stats has been sent.
+        */
+       if (rb->spillBytes <= 0)
+               return;
+
+       elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
+                rb,
+                (long long) rb->spillTxns,
+                (long long) rb->spillCount,
+                (long long) rb->spillBytes);
+
+       pgstat_report_replslot(NameStr(ctx->slot->data.name),
+                                                  rb->spillTxns, rb->spillCount, rb->spillBytes);
+       rb->spillTxns = 0;
+       rb->spillCount = 0;
+       rb->spillBytes = 0;
+}
index 1975d629a6e2f025f041d0ad23d28dc7dbd95fba..189641bbf5b1fd39ce5b366cdf2d71132c48890a 100644 (file)
@@ -343,6 +343,10 @@ ReorderBufferAllocate(void)
        buffer->outbufsize = 0;
        buffer->size = 0;
 
+       buffer->spillTxns = 0;
+       buffer->spillCount = 0;
+       buffer->spillBytes = 0;
+
        buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
        dlist_init(&buffer->toplevel_by_lsn);
@@ -1579,6 +1583,13 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        {
                ReorderBufferRestoreCleanup(rb, txn);
                txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
+
+               /*
+                * We set this flag to indicate if the transaction is ever serialized.
+                * We need this to accurately update the stats as otherwise the same
+                * transaction can be counted as serialized multiple times.
+                */
+               txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
        }
 
        /* also reset the number of entries in the transaction */
@@ -3112,6 +3123,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        int                     fd = -1;
        XLogSegNo       curOpenSegNo = 0;
        Size            spilled = 0;
+       Size            size = txn->size;
 
        elog(DEBUG2, "spill %u changes in XID %u to disk",
                 (uint32) txn->nentries_mem, txn->xid);
@@ -3170,6 +3182,16 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
                spilled++;
        }
 
+       /* update the statistics iff we have spilled anything */
+       if (spilled)
+       {
+               rb->spillCount += 1;
+               rb->spillBytes += size;
+
+               /* don't consider already serialized transactions */
+               rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+       }
+
        Assert(spilled == txn->nentries_mem);
        Assert(dlist_is_empty(&txn->changes));
        txn->nentries_mem = 0;
index 42c78eabd4eb9ef805991d56a18ea7a4a63742cc..220b4cd6e99cde2008309a73abcb0dc5511a3f16 100644 (file)
@@ -99,7 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL;
 int                    max_replication_slots = 0;      /* the maximum number of replication
                                                                                 * slots */
 
-static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
 static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
                                                                                  const char *name, SlotAcquireBehavior behavior);
 static void ReplicationSlotDropAcquired(void);
@@ -314,6 +313,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 
        LWLockRelease(ReplicationSlotControlLock);
 
+       /*
+        * Create statistics entry for the new logical slot. We don't collect any
+        * stats for physical slots, so no need to create an entry for the same.
+        * See ReplicationSlotDropPtr for why we need to do this before releasing
+        * ReplicationSlotAllocationLock.
+        */
+       if (SlotIsLogical(slot))
+               pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+
        /*
         * Now that the slot has been marked as in_use and active, it's safe to
         * let somebody else try to allocate a slot.
@@ -331,7 +339,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
  *
  * The caller must hold ReplicationSlotControlLock in shared mode.
  */
-static ReplicationSlot *
+ReplicationSlot *
 SearchNamedReplicationSlot(const char *name)
 {
        int                     i;
@@ -683,6 +691,19 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
                ereport(WARNING,
                                (errmsg("could not remove directory \"%s\"", tmppath)));
 
+       /*
+        * Send a message to drop the replication slot to the stats collector.
+        * Since there is no guarantee of the order of message transfer on a UDP
+        * connection, it's possible that a message for creating a new slot
+        * reaches before a message for removing the old slot. We send the drop
+        * and create messages while holding ReplicationSlotAllocationLock to
+        * reduce that possibility. If the messages reached in reverse, we would
+        * lose one statistics update message. But the next update message will
+        * create the statistics for the replication slot.
+        */
+       if (SlotIsLogical(slot))
+               pgstat_report_replslot_drop(NameStr(slot->data.name));
+
        /*
         * We release this at the very end, so that nobody starts trying to create
         * a slot while we're still cleaning up the detritus of the old one.
index 24e191ea30b67c61c8d9005f5d7b24d36142049b..0d0d2e6d2bb2e42f66083e95a873ad53997f1269 100644 (file)
@@ -2069,6 +2069,20 @@ pg_stat_reset_slru(PG_FUNCTION_ARGS)
        PG_RETURN_VOID();
 }
 
+/* Reset replication slots stats (a specific one or all of them). */
+Datum
+pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
+{
+       char       *target = NULL;
+
+       if (!PG_ARGISNULL(0))
+               target = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+       pgstat_reset_replslot_counter(target);
+
+       PG_RETURN_VOID();
+}
+
 Datum
 pg_stat_get_archiver(PG_FUNCTION_ARGS)
 {
@@ -2134,3 +2148,69 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
        /* Returns the record as Datum */
        PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
 }
+
+/* Get the statistics for the replication slots */
+Datum
+pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 5
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc       tupdesc;
+       Tuplestorestate *tupstore;
+       MemoryContext per_query_ctx;
+       MemoryContext oldcontext;
+       PgStat_ReplSlotStats *slotstats;
+       int                     nstats;
+       int                     i;
+
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context that cannot accept a set")));
+       if (!(rsinfo->allowedModes & SFRM_Materialize))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is not allowed in this context")));
+
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               elog(ERROR, "return type must be a row type");
+
+       per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+       oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+       tupstore = tuplestore_begin_heap(true, false, work_mem);
+       rsinfo->returnMode = SFRM_Materialize;
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
+
+       MemoryContextSwitchTo(oldcontext);
+
+       slotstats = pgstat_fetch_replslot(&nstats);
+       for (i = 0; i < nstats; i++)
+       {
+               Datum           values[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+               bool            nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+               PgStat_ReplSlotStats *s = &(slotstats[i]);
+
+               MemSet(values, 0, sizeof(values));
+               MemSet(nulls, 0, sizeof(nulls));
+
+               values[0] = PointerGetDatum(cstring_to_text(s->slotname));
+               values[1] = Int64GetDatum(s->spill_txns);
+               values[2] = Int64GetDatum(s->spill_count);
+               values[3] = Int64GetDatum(s->spill_bytes);
+
+               if (s->stat_reset_timestamp == 0)
+                       nulls[4] = true;
+               else
+                       values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+
+               tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+       }
+
+       tuplestore_donestoring(tupstore);
+
+       return (Datum) 0;
+}
index 1be017972438cf3f019d54e61fb8ce3471230a2f..584c61728439f953293369a1021dd14db061e104 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202010021
+#define CATALOG_VERSION_NO     202010081
 
 #endif
index d6f3e2d286b46bdfb6de3f15e20ef388304b5296..22340baf1c67db088e48c0998bac0347ea50e1ff 100644 (file)
   proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
   proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
   prosrc => 'pg_stat_get_wal_receiver' },
+{ oid => '8595', descr => 'statistics: information about replication slots',
+  proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
+  proretset => 't', provolatile => 's', proparallel => 'r',
+  prorettype => 'record', proargtypes => '',
+  proallargtypes => '{text,int8,int8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o}',
+  proargnames => '{name,spill_txns,spill_count,spill_bytes,stats_reset}',
+  prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
   descr => 'statistics: reset collected statistics for a single SLRU',
   proname => 'pg_stat_reset_slru', proisstrict => 'f', provolatile => 'v',
   prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_slru' },
+{ oid => '8596',
+  descr => 'statistics: reset collected statistics for a single replication slot',
+  proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => 'v',
+  prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_replication_slot' },
 
 { oid => '3163', descr => 'current trigger depth',
   proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
index 343eef507eab3045f1f770e97e636799c2d12b33..a821ff4f158fee684410ed4512b004ad63961002 100644 (file)
@@ -56,6 +56,7 @@ typedef enum StatMsgType
        PGSTAT_MTYPE_RESETSHAREDCOUNTER,
        PGSTAT_MTYPE_RESETSINGLECOUNTER,
        PGSTAT_MTYPE_RESETSLRUCOUNTER,
+       PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
        PGSTAT_MTYPE_AUTOVAC_START,
        PGSTAT_MTYPE_VACUUM,
        PGSTAT_MTYPE_ANALYZE,
@@ -68,7 +69,8 @@ typedef enum StatMsgType
        PGSTAT_MTYPE_RECOVERYCONFLICT,
        PGSTAT_MTYPE_TEMPFILE,
        PGSTAT_MTYPE_DEADLOCK,
-       PGSTAT_MTYPE_CHECKSUMFAILURE
+       PGSTAT_MTYPE_CHECKSUMFAILURE,
+       PGSTAT_MTYPE_REPLSLOT,
 } StatMsgType;
 
 /* ----------
@@ -358,6 +360,18 @@ typedef struct PgStat_MsgResetslrucounter
        int                     m_index;
 } PgStat_MsgResetslrucounter;
 
+/* ----------
+ * PgStat_MsgResetreplslotcounter Sent by the backend to tell the collector
+ *                                                             to reset replication slot counter(s)
+ * ----------
+ */
+typedef struct PgStat_MsgResetreplslotcounter
+{
+       PgStat_MsgHdr m_hdr;
+       char            m_slotname[NAMEDATALEN];
+       bool            clearall;
+} PgStat_MsgResetreplslotcounter;
+
 /* ----------
  * PgStat_MsgAutovacStart              Sent by the autovacuum daemon to signal
  *                                                             that a database is going to be processed
@@ -465,6 +479,22 @@ typedef struct PgStat_MsgSLRU
        PgStat_Counter m_truncate;
 } PgStat_MsgSLRU;
 
+/* ----------
+ * PgStat_MsgReplSlot  Sent by a backend or a wal sender to update replication
+ *                                             slot statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgReplSlot
+{
+       PgStat_MsgHdr m_hdr;
+       char            m_slotname[NAMEDATALEN];
+       bool            m_drop;
+       PgStat_Counter m_spill_txns;
+       PgStat_Counter m_spill_count;
+       PgStat_Counter m_spill_bytes;
+} PgStat_MsgReplSlot;
+
+
 /* ----------
  * PgStat_MsgRecoveryConflict  Sent by the backend upon recovery conflict
  * ----------
@@ -603,6 +633,7 @@ typedef union PgStat_Msg
        PgStat_MsgResetsharedcounter msg_resetsharedcounter;
        PgStat_MsgResetsinglecounter msg_resetsinglecounter;
        PgStat_MsgResetslrucounter msg_resetslrucounter;
+       PgStat_MsgResetreplslotcounter msg_resetreplslotcounter;
        PgStat_MsgAutovacStart msg_autovacuum_start;
        PgStat_MsgVacuum msg_vacuum;
        PgStat_MsgAnalyze msg_analyze;
@@ -616,6 +647,7 @@ typedef union PgStat_Msg
        PgStat_MsgDeadlock msg_deadlock;
        PgStat_MsgTempFile msg_tempfile;
        PgStat_MsgChecksumFailure msg_checksumfailure;
+       PgStat_MsgReplSlot msg_replslot;
 } PgStat_Msg;
 
 
@@ -627,7 +659,7 @@ typedef union PgStat_Msg
  * ------------------------------------------------------------
  */
 
-#define PGSTAT_FILE_FORMAT_ID  0x01A5BC9E
+#define PGSTAT_FILE_FORMAT_ID  0x01A5BC9F
 
 /* ----------
  * PgStat_StatDBEntry                  The collector's data per database
@@ -782,6 +814,17 @@ typedef struct PgStat_SLRUStats
        TimestampTz stat_reset_timestamp;
 } PgStat_SLRUStats;
 
+/*
+ * Replication slot statistics kept in the stats collector
+ */
+typedef struct PgStat_ReplSlotStats
+{
+       char            slotname[NAMEDATALEN];
+       PgStat_Counter spill_txns;
+       PgStat_Counter spill_count;
+       PgStat_Counter spill_bytes;
+       TimestampTz stat_reset_timestamp;
+} PgStat_ReplSlotStats;
 
 /* ----------
  * Backend states
@@ -1330,6 +1373,7 @@ extern void pgstat_reset_counters(void);
 extern void pgstat_reset_shared_counters(const char *);
 extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
 extern void pgstat_reset_slru_counter(const char *);
+extern void pgstat_reset_replslot_counter(const char *name);
 
 extern void pgstat_report_autovac(Oid dboid);
 extern void pgstat_report_vacuum(Oid tableoid, bool shared,
@@ -1342,6 +1386,9 @@ extern void pgstat_report_recovery_conflict(int reason);
 extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
+extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+                                                                  int spillbytes);
+extern void pgstat_report_replslot_drop(const char *slotname);
 
 extern void pgstat_initialize(void);
 extern void pgstat_bestart(void);
@@ -1508,6 +1555,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
 extern PgStat_GlobalStats *pgstat_fetch_global(void);
 extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
 extern PgStat_SLRUStats *pgstat_fetch_slru(void);
+extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
 
 extern void pgstat_count_slru_page_zeroed(int slru_idx);
 extern void pgstat_count_slru_page_hit(int slru_idx);
index 45abc444b7a55306cec189efed8e531a29ed531c..40bab7ee02df48ed4d203af1b61f1f23c10fbf68 100644 (file)
@@ -122,5 +122,6 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
+extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
 #endif
index 1ae17d5f11fd73fadd6b3877b2fb00600ac2b435..0cc3aebb11131fbd75c53f8504987fe142da23f3 100644 (file)
@@ -162,9 +162,10 @@ typedef struct ReorderBufferChange
 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
 #define RBTXN_IS_SUBXACT          0x0002
 #define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_STREAMED         0x0008
-#define RBTXN_HAS_TOAST_INSERT    0x0010
-#define RBTXN_HAS_SPEC_INSERT     0x0020
+#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
+#define RBTXN_IS_STREAMED         0x0010
+#define RBTXN_HAS_TOAST_INSERT    0x0020
+#define RBTXN_HAS_SPEC_INSERT     0x0040
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -184,6 +185,12 @@ typedef struct ReorderBufferChange
        ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
 )
 
+/* Has this transaction ever been spilled to disk? */
+#define rbtxn_is_serialized_clear(txn) \
+( \
+       ((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
+)
+
 /* This transaction's changes has toast insert, without main table insert. */
 #define rbtxn_has_toast_insert(txn) \
 ( \
@@ -525,6 +532,17 @@ struct ReorderBuffer
 
        /* memory accounting */
        Size            size;
+
+       /*
+        * Statistics about transactions spilled to disk.
+        *
+        * A single transaction may be spilled repeatedly, which is why we keep
+        * two different counters. For spilling, the transaction counter includes
+        * both toplevel transactions and subtransactions.
+        */
+       int64           spillTxns;              /* number of transactions spilled to disk */
+       int64           spillCount;             /* spill-to-disk invocation counter */
+       int64           spillBytes;             /* amount of data spilled to disk */
 };
 
 
index 31362585ecb1e2d0c3eabf25f6043cceb534b6af..63bab6967fb587cccaf7ad5d3fbe8ee32daf60cd 100644 (file)
@@ -210,6 +210,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
index af4192f9a87222a6ce613f0abb146f0bb6896ac3..cf2a9b44082eaa1bab0a595f574681bf39273453 100644 (file)
@@ -2018,6 +2018,12 @@ pg_stat_replication| SELECT s.pid,
    FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
+pg_stat_replication_slots| SELECT s.name,
+    s.spill_txns,
+    s.spill_count,
+    s.spill_bytes,
+    s.stats_reset
+   FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stats_reset);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,
index 4191f948696343b208f705f4d6392548e84b02cb..be570329ea73a713b1df95004360fb5d11c1a823 100644 (file)
@@ -1832,7 +1832,9 @@ PgStat_MsgFuncstat
 PgStat_MsgHdr
 PgStat_MsgInquiry
 PgStat_MsgRecoveryConflict
+PgStat_MsgReplSlot
 PgStat_MsgResetcounter
+PgStat_MsgResetreplslotcounter
 PgStat_MsgResetsharedcounter
 PgStat_MsgResetsinglecounter
 PgStat_MsgResetslrucounter
@@ -1842,6 +1844,7 @@ PgStat_MsgTabstat
 PgStat_MsgTempFile
 PgStat_MsgVacuum
 PgStat_MsgWal
+PgStat_ReplSlotStats
 PgStat_SLRUStats
 PgStat_Shared_Reset_Target
 PgStat_Single_Reset_Type