Add information of total data processed to replication slot stats.
authorAmit Kapila <akapila@postgresql.org>
Fri, 16 Apr 2021 02:04:43 +0000 (07:34 +0530)
committerAmit Kapila <akapila@postgresql.org>
Fri, 16 Apr 2021 02:04:43 +0000 (07:34 +0530)
This adds the statistics about total transactions count and total
transaction data logically sent to the decoding output plugin from
ReorderBuffer. Users can query the pg_stat_replication_slots view to check
these stats.

Suggested-by: Andres Freund
Author: Vignesh C and Amit Kapila
Reviewed-by: Sawada Masahiko, Amit Kapila
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de

15 files changed:
contrib/test_decoding/Makefile
contrib/test_decoding/expected/stats.out
contrib/test_decoding/sql/stats.sql
contrib/test_decoding/t/001_repl_stats.pl [new file with mode: 0644]
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/postmaster/pgstat.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/pgstat.h
src/include/replication/reorderbuffer.h
src/test/regress/expected/rules.out

index c5e28ce5cca78bbe48bb2f9e21faa15b632af1e0..9a31e0b8795874f58e6822344d684809a3fe609f 100644 (file)
@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 # typical installcheck users do not have (e.g. buildfarm clients).
 NO_INSTALLCHECK = 1
 
+TAP_TESTS = 1
+
 ifdef USE_PGXS
 PG_CONFIG = pg_config
 PGXS := $(shell $(PG_CONFIG) --pgxs)
index bca36fa90309c231750100e444be80309e98bdc4..bc8e601eab6b267182818d7016bc039940dd5941 100644 (file)
@@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 
 CREATE TABLE stats_test(data text);
 -- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
 DECLARE
   start_time timestamptz := clock_timestamp();
   updated bool;
@@ -16,12 +16,25 @@ BEGIN
   -- we don't want to wait forever; loop will exit after 30 seconds
   FOR i IN 1 .. 300 LOOP
 
-    -- check to see if all updates have been reset/updated
-    SELECT CASE WHEN check_reset THEN (spill_txns = 0)
-                ELSE (spill_txns > 0)
-           END
-    INTO updated
-    FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+    IF check_spill_txns THEN
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+                  ELSE (spill_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    ELSE
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (total_txns = 0)
+                  ELSE (total_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    END IF;
 
     exit WHEN updated;
 
@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
 -- Check stats, wait for the stats collector to update. We can't test the
 -- exact stats count as that can vary if any background transaction (say by
 -- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot | t          | t
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t          | t           | t          | t
 (1 row)
 
 -- reset the slot stats, and wait for stats collector to reset
@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot');
  
 (1 row)
 
-SELECT wait_for_decode_stats(true);
+SELECT wait_for_decode_stats(true, true);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot |          0 |           0
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot |          0 |           0 |          0 |           0
 (1 row)
 
 -- decode and check stats again.
@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
   5002
 (1 row)
 
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
+ wait_for_decode_stats 
+-----------------------
+(1 row)
+
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t          | t           | t          | t
+(1 row)
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+ pg_stat_reset_replication_slot 
+--------------------------------
+(1 row)
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
  wait_for_decode_stats 
 -----------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-    slot_name    | spill_txns | spill_count 
------------------+------------+-------------
- regression_slot | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+    slot_name    | spill_txns | spill_count | total_txns | total_bytes 
+-----------------+------------+-------------+------------+-------------
+ regression_slot | f          | f           | t          | t
 (1 row)
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots;
 (1 row)
 
 COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
index 51294e48e87fd01e547d358dc4885ba801808584..8c34aeced1de0d4c045b6835e844792f7d2b2edc 100644 (file)
@@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
 CREATE TABLE stats_test(data text);
 
 -- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
 DECLARE
   start_time timestamptz := clock_timestamp();
   updated bool;
@@ -14,12 +14,25 @@ BEGIN
   -- we don't want to wait forever; loop will exit after 30 seconds
   FOR i IN 1 .. 300 LOOP
 
-    -- check to see if all updates have been reset/updated
-    SELECT CASE WHEN check_reset THEN (spill_txns = 0)
-                ELSE (spill_txns > 0)
-           END
-    INTO updated
-    FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+    IF check_spill_txns THEN
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+                  ELSE (spill_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    ELSE
+
+      -- check to see if all updates have been reset/updated
+      SELECT CASE WHEN check_reset THEN (total_txns = 0)
+                  ELSE (total_txns > 0)
+             END
+      INTO updated
+      FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+    END IF;
 
     exit WHEN updated;
 
@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
 -- Check stats, wait for the stats collector to update. We can't test the
 -- exact stats count as that can vary if any background transaction (say by
 -- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
 
 -- reset the slot stats, and wait for stats collector to reset
 SELECT pg_stat_reset_replication_slot('regression_slot');
-SELECT wait_for_decode_stats(true);
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(true, true);
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
 
 -- decode and check stats again.
 SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots;
 SELECT slot_name FROM pg_stat_replication_slots;
 COMMIT;
 
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
new file mode 100644 (file)
index 0000000..11b6cd9
--- /dev/null
@@ -0,0 +1,76 @@
+# Test replication statistics data in pg_stat_replication_slots is sane after
+# drop replication slot and restart.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Test set-up
+my $node = get_new_node('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'synchronous_commit = on');
+$node->start;
+
+# Create table.
+$node->safe_psql('postgres',
+        "CREATE TABLE test_repl_stat(col1 int)");
+
+# Create replication slots.
+$node->safe_psql(
+   'postgres', qq[
+   SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');
+   SELECT pg_create_logical_replication_slot('regression_slot2', 'test_decoding');
+   SELECT pg_create_logical_replication_slot('regression_slot3', 'test_decoding');
+   SELECT pg_create_logical_replication_slot('regression_slot4', 'test_decoding');
+]);
+
+# Insert some data.
+$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
+
+$node->safe_psql(
+   'postgres', qq[
+   SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL,
+   NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+   SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL,
+   NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+   SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL,
+   NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+   SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL,
+   NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+]);
+
+# Wait for the statistics to be updated.
+$node->poll_query_until(
+   'postgres', qq[
+   SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
+   WHERE slot_name ~ 'regression_slot'
+   AND total_txns > 0 AND total_bytes > 0;
+]) or die "Timed out while waiting for statistics to be updated";
+
+# Test to drop one of the replication slot and verify replication statistics data is
+# fine after restart.
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
+
+$node->stop;
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+my $result = $node->safe_psql('postgres',
+   "SELECT slot_name, total_txns > 0 AS total_txn,
+   total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
+   ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t), 'check replication statistics are updated');
+
+# cleanup
+$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
+
+# shutdown
+$node->stop;
index 8287587f614df4211735a6fbb74f81c366b05c41..c44d087508093eb2657831426392e101bb3b0a80 100644 (file)
@@ -2716,6 +2716,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_txns</structfield> <type>bigint</type>
+       </para>
+       <para>
+        Number of decoded transactions sent to the decoding output plugin for
+        this slot. This counter is used to maintain the top level transactions,
+        so the counter is not incremented for subtransactions. Note that this
+        includes the transactions that are streamed and/or spilled.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>total_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of decoded transactions data sent to the decoding output plugin
+        while decoding the changes from WAL for this slot. This can be used to
+        gauge the total amount of data sent during logical decoding. Note that
+        this includes the data that is streamed and/or spilled.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
index 451db2ee0a064ab1fc54ff13aeb20024a1be53b4..6d78b335908189092b30298def9913f91805b368 100644 (file)
@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_txns,
             s.stream_count,
             s.stream_bytes,
+            s.total_txns,
+            s.total_bytes,
             s.stats_reset
     FROM pg_stat_get_replication_slots() AS s;
 
index 666ce95d083d8a3bd999d5f815d745c2c650a5e0..e1ec7d8b7d65a9c8e9e92df97f242f9426cb4b3f 100644 (file)
@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
    msg.m_stream_txns = repSlotStat->stream_txns;
    msg.m_stream_count = repSlotStat->stream_count;
    msg.m_stream_bytes = repSlotStat->stream_bytes;
+   msg.m_total_txns = repSlotStat->total_txns;
+   msg.m_total_bytes = repSlotStat->total_bytes;
    pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
 }
 
@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
        replSlotStats[idx].stream_txns += msg->m_stream_txns;
        replSlotStats[idx].stream_count += msg->m_stream_count;
        replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+       replSlotStats[idx].total_txns += msg->m_total_txns;
+       replSlotStats[idx].total_bytes += msg->m_total_bytes;
    }
 }
 
@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
    replSlotStats[i].stream_txns = 0;
    replSlotStats[i].stream_count = 0;
    replSlotStats[i].stream_bytes = 0;
+   replSlotStats[i].total_txns = 0;
+   replSlotStats[i].total_bytes = 0;
    replSlotStats[i].stat_reset_timestamp = ts;
 }
 
index 68e210ce12bceef5c075702e7a83fb38cb189fc5..35b0c67641291c7a012e7704079931072f57784d 100644 (file)
@@ -1775,21 +1775,20 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
    ReorderBuffer *rb = ctx->reorder;
    PgStat_ReplSlotStats repSlotStat;
 
-   /*
-    * Nothing to do if we haven't spilled or streamed anything since the last
-    * time the stats has been sent.
-    */
-   if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+   /* Nothing to do if we don't have any replication stats to be sent. */
+   if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
        return;
 
-   elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+   elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
         rb,
         (long long) rb->spillTxns,
         (long long) rb->spillCount,
         (long long) rb->spillBytes,
         (long long) rb->streamTxns,
         (long long) rb->streamCount,
-        (long long) rb->streamBytes);
+        (long long) rb->streamBytes,
+        (long long) rb->totalTxns,
+        (long long) rb->totalBytes);
 
    namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
    repSlotStat.spill_txns = rb->spillTxns;
@@ -1798,12 +1797,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
    repSlotStat.stream_txns = rb->streamTxns;
    repSlotStat.stream_count = rb->streamCount;
    repSlotStat.stream_bytes = rb->streamBytes;
+   repSlotStat.total_txns = rb->totalTxns;
+   repSlotStat.total_bytes = rb->totalBytes;
 
    pgstat_report_replslot(&repSlotStat);
+
    rb->spillTxns = 0;
    rb->spillCount = 0;
    rb->spillBytes = 0;
    rb->streamTxns = 0;
    rb->streamCount = 0;
    rb->streamBytes = 0;
+   rb->totalTxns = 0;
+   rb->totalBytes = 0;
 }
index 52d06285a21528dd5fa8b26fc9dffc65ead2dfc5..5cb484f03231ef66a0933a40e21389a71fa519ce 100644 (file)
@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
    buffer->streamTxns = 0;
    buffer->streamCount = 0;
    buffer->streamBytes = 0;
+   buffer->totalTxns = 0;
+   buffer->totalBytes = 0;
 
    buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
@@ -1363,6 +1365,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
        dlist_delete(&change->node);
        dlist_push_tail(&state->old_change, &change->node);
 
+       /*
+        * Update the total bytes processed before releasing the current set
+        * of changes and restoring the new set of changes.
+        */
+       rb->totalBytes += rb->size;
        if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
                                        &state->entries[off].segno))
        {
@@ -2363,6 +2370,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
        ReorderBufferIterTXNFinish(rb, iterstate);
        iterstate = NULL;
 
+       /*
+        * Update total transaction count and total transaction bytes
+        * processed. Ensure to not count the streamed transaction multiple
+        * times.
+        *
+        * Note that the statistics computation has to be done after
+        * ReorderBufferIterTXNFinish as it releases the serialized change
+        * which we have already accounted in ReorderBufferIterTXNNext.
+        */
+       if (!rbtxn_is_streamed(txn))
+           rb->totalTxns++;
+
+       rb->totalBytes += rb->size;
+
        /*
         * Done with current changes, send the last message for this set of
         * changes depending upon streaming mode.
index 521ba7361439d65f9fb524e36aec3b6a83bf2d62..2680190a4026bdfc9961ed54f70012fc7a4ba706 100644 (file)
@@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
    ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    TupleDesc   tupdesc;
    Tuplestorestate *tupstore;
@@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
        values[4] = Int64GetDatum(s->stream_txns);
        values[5] = Int64GetDatum(s->stream_count);
        values[6] = Int64GetDatum(s->stream_bytes);
+       values[7] = Int64GetDatum(s->total_txns);
+       values[8] = Int64GetDatum(s->total_bytes);
 
        if (s->stat_reset_timestamp == 0)
-           nulls[7] = true;
+           nulls[9] = true;
        else
-           values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+           values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
 
        tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    }
index 87e9596da5691a2cbd181416da914b45d19d9d2c..904b0c97ec2118df2c6b7b794efe7d68e37cb866 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202104151
+#define CATALOG_VERSION_NO 202104161
 
 #endif
index f4957653ae6c067c0a28182207c797ab373e1761..591753fe817128a3a4e7f60ffcaf7434738b78e8 100644 (file)
   proname => 'pg_stat_get_replication_slots', prorows => '10',
   proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => '',
-  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+  proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slots' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
index 8e11215058e801180dcf5039d006ba65a6beaf16..2aeb3cded4d71c9a462b4f8af36647c25de0282a 100644 (file)
@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot
    PgStat_Counter m_stream_txns;
    PgStat_Counter m_stream_count;
    PgStat_Counter m_stream_bytes;
+   PgStat_Counter m_total_txns;
+   PgStat_Counter m_total_bytes;
 } PgStat_MsgReplSlot;
 
 /* ----------
@@ -924,6 +926,8 @@ typedef struct PgStat_ReplSlotStats
    PgStat_Counter stream_txns;
    PgStat_Counter stream_count;
    PgStat_Counter stream_bytes;
+   PgStat_Counter total_txns;
+   PgStat_Counter total_bytes;
    TimestampTz stat_reset_timestamp;
 } PgStat_ReplSlotStats;
 
index 565a961d6ab242d52fc77563243ab86776fef6d7..bfab8303ee773a0f8accdb209559379bc4818c8c 100644 (file)
@@ -618,6 +618,13 @@ struct ReorderBuffer
    int64       streamTxns;     /* number of transactions streamed */
    int64       streamCount;    /* streaming invocation counter */
    int64       streamBytes;    /* amount of data streamed */
+
+   /*
+    * Statistics about all the transactions sent to the decoding output
+    * plugin
+    */
+   int64       totalTxns;      /* total number of transactions sent */
+   int64       totalBytes;     /* total amount of data sent */
 };
 
 
index 186e6c966c68f450e55264ce9353defaae24c4d2..6399f3feef8033135459e1b58a3e0139dd0b9c37 100644 (file)
@@ -2068,8 +2068,10 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_txns,
     s.stream_count,
     s.stream_bytes,
+    s.total_txns,
+    s.total_bytes,
     s.stats_reset
-   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+   FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
 pg_stat_slru| SELECT s.name,
     s.blks_zeroed,
     s.blks_hit,