# typical installcheck users do not have (e.g. buildfarm clients).
NO_INSTALLCHECK = 1
+TAP_TESTS = 1
+
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
CREATE TABLE stats_test(data text);
-- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
DECLARE
start_time timestamptz := clock_timestamp();
updated bool;
-- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP
- -- check to see if all updates have been reset/updated
- SELECT CASE WHEN check_reset THEN (spill_txns = 0)
- ELSE (spill_txns > 0)
- END
- INTO updated
- FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+ IF check_spill_txns THEN
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+ ELSE (spill_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ ELSE
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (total_txns = 0)
+ ELSE (total_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ END IF;
exit WHEN updated;
-- Check stats, wait for the stats collector to update. We can't test the
-- exact stats count as that can vary if any background transaction (say by
-- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | t | t
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t | t | t | t
(1 row)
-- reset the slot stats, and wait for stats collector to reset
(1 row)
-SELECT wait_for_decode_stats(true);
+SELECT wait_for_decode_stats(true, true);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | 0 | 0
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | 0 | 0 | 0 | 0
(1 row)
-- decode and check stats again.
5002
(1 row)
-SELECT wait_for_decode_stats(false);
+SELECT wait_for_decode_stats(false, true);
+ wait_for_decode_stats
+-----------------------
+
+(1 row)
+
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | t | t | t | t
+(1 row)
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+ pg_stat_reset_replication_slot
+--------------------------------
+
+(1 row)
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
wait_for_decode_stats
-----------------------
(1 row)
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
- slot_name | spill_txns | spill_count
------------------+------------+-------------
- regression_slot | t | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+ slot_name | spill_txns | spill_count | total_txns | total_bytes
+-----------------+------------+-------------+------------+-------------
+ regression_slot | f | f | t | t
(1 row)
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
(1 row)
COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
CREATE TABLE stats_test(data text);
-- function to wait for counters to advance
-CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$
+CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
DECLARE
start_time timestamptz := clock_timestamp();
updated bool;
-- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP
- -- check to see if all updates have been reset/updated
- SELECT CASE WHEN check_reset THEN (spill_txns = 0)
- ELSE (spill_txns > 0)
- END
- INTO updated
- FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+ IF check_spill_txns THEN
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (spill_txns = 0)
+ ELSE (spill_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ ELSE
+
+ -- check to see if all updates have been reset/updated
+ SELECT CASE WHEN check_reset THEN (total_txns = 0)
+ ELSE (total_txns > 0)
+ END
+ INTO updated
+ FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
+
+ END IF;
exit WHEN updated;
-- Check stats, wait for the stats collector to update. We can't test the
-- exact stats count as that can vary if any background transaction (say by
-- autovacuum) happens in parallel to the main transaction.
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
-- reset the slot stats, and wait for stats collector to reset
SELECT pg_stat_reset_replication_slot('regression_slot');
-SELECT wait_for_decode_stats(true);
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(true, true);
+SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
-- decode and check stats again.
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
-SELECT wait_for_decode_stats(false);
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT wait_for_decode_stats(false, true);
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
+
+SELECT pg_stat_reset_replication_slot('regression_slot');
+
+-- non-spilled xact
+INSERT INTO stats_test values(generate_series(1, 10));
+SELECT wait_for_decode_stats(false, false);
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
SELECT slot_name FROM pg_stat_replication_slots;
COMMIT;
-DROP FUNCTION wait_for_decode_stats(bool);
+DROP FUNCTION wait_for_decode_stats(bool, bool);
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot');
--- /dev/null
+# Test replication statistics data in pg_stat_replication_slots is sane after
+# drop replication slot and restart.
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 1;
+
+# Test set-up
+my $node = get_new_node('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'synchronous_commit = on');
+$node->start;
+
+# Create table.
+$node->safe_psql('postgres',
+ "CREATE TABLE test_repl_stat(col1 int)");
+
+# Create replication slots.
+$node->safe_psql(
+ 'postgres', qq[
+ SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot2', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot3', 'test_decoding');
+ SELECT pg_create_logical_replication_slot('regression_slot4', 'test_decoding');
+]);
+
+# Insert some data.
+$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
+
+$node->safe_psql(
+ 'postgres', qq[
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL,
+ NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+]);
+
+# Wait for the statistics to be updated.
+$node->poll_query_until(
+ 'postgres', qq[
+ SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
+ WHERE slot_name ~ 'regression_slot'
+ AND total_txns > 0 AND total_bytes > 0;
+]) or die "Timed out while waiting for statistics to be updated";
+
+# Test to drop one of the replication slot and verify replication statistics data is
+# fine after restart.
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
+
+$node->stop;
+$node->start;
+
+# Verify statistics data present in pg_stat_replication_slots are sane after
+# restart.
+my $result = $node->safe_psql('postgres',
+ "SELECT slot_name, total_txns > 0 AS total_txn,
+ total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
+ ORDER BY slot_name"
+);
+is($result, qq(regression_slot1|t|t
+regression_slot2|t|t
+regression_slot3|t|t), 'check replication statistics are updated');
+
+# cleanup
+$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
+$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
+
+# shutdown
+$node->stop;
</entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>total_txns</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of decoded transactions sent to the decoding output plugin for
+ this slot. This counter is used to maintain the top level transactions,
+ so the counter is not incremented for subtransactions. Note that this
+ includes the transactions that are streamed and/or spilled.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>total_bytes</structfield><type>bigint</type>
+ </para>
+ <para>
+ Amount of decoded transactions data sent to the decoding output plugin
+ while decoding the changes from WAL for this slot. This can be used to
+ gauge the total amount of data sent during logical decoding. Note that
+ this includes the data that is streamed and/or spilled.
+ </para>
+ </entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
msg.m_stream_txns = repSlotStat->stream_txns;
msg.m_stream_count = repSlotStat->stream_count;
msg.m_stream_bytes = repSlotStat->stream_bytes;
+ msg.m_total_txns = repSlotStat->total_txns;
+ msg.m_total_bytes = repSlotStat->total_bytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
replSlotStats[idx].stream_txns += msg->m_stream_txns;
replSlotStats[idx].stream_count += msg->m_stream_count;
replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
+ replSlotStats[idx].total_txns += msg->m_total_txns;
+ replSlotStats[idx].total_bytes += msg->m_total_bytes;
}
}
replSlotStats[i].stream_txns = 0;
replSlotStats[i].stream_count = 0;
replSlotStats[i].stream_bytes = 0;
+ replSlotStats[i].total_txns = 0;
+ replSlotStats[i].total_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
ReorderBuffer *rb = ctx->reorder;
PgStat_ReplSlotStats repSlotStat;
- /*
- * Nothing to do if we haven't spilled or streamed anything since the last
- * time the stats has been sent.
- */
- if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
+ /* Nothing to do if we don't have any replication stats to be sent. */
+ if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
return;
- elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
+ elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
(long long) rb->spillBytes,
(long long) rb->streamTxns,
(long long) rb->streamCount,
- (long long) rb->streamBytes);
+ (long long) rb->streamBytes,
+ (long long) rb->totalTxns,
+ (long long) rb->totalBytes);
namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
repSlotStat.spill_txns = rb->spillTxns;
repSlotStat.stream_txns = rb->streamTxns;
repSlotStat.stream_count = rb->streamCount;
repSlotStat.stream_bytes = rb->streamBytes;
+ repSlotStat.total_txns = rb->totalTxns;
+ repSlotStat.total_bytes = rb->totalBytes;
pgstat_report_replslot(&repSlotStat);
+
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
rb->streamTxns = 0;
rb->streamCount = 0;
rb->streamBytes = 0;
+ rb->totalTxns = 0;
+ rb->totalBytes = 0;
}
buffer->streamTxns = 0;
buffer->streamCount = 0;
buffer->streamBytes = 0;
+ buffer->totalTxns = 0;
+ buffer->totalBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_delete(&change->node);
dlist_push_tail(&state->old_change, &change->node);
+ /*
+ * Update the total bytes processed before releasing the current set
+ * of changes and restoring the new set of changes.
+ */
+ rb->totalBytes += rb->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno))
{
ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;
+ /*
+ * Update total transaction count and total transaction bytes
+ * processed. Ensure to not count the streamed transaction multiple
+ * times.
+ *
+ * Note that the statistics computation has to be done after
+ * ReorderBufferIterTXNFinish as it releases the serialized change
+ * which we have already accounted in ReorderBufferIterTXNNext.
+ */
+ if (!rbtxn_is_streamed(txn))
+ rb->totalTxns++;
+
+ rb->totalBytes += rb->size;
+
/*
* Done with current changes, send the last message for this set of
* changes depending upon streaming mode.
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
values[4] = Int64GetDatum(s->stream_txns);
values[5] = Int64GetDatum(s->stream_count);
values[6] = Int64GetDatum(s->stream_bytes);
+ values[7] = Int64GetDatum(s->total_txns);
+ values[8] = Int64GetDatum(s->total_bytes);
if (s->stat_reset_timestamp == 0)
- nulls[7] = true;
+ nulls[9] = true;
else
- values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
+ values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202104151
+#define CATALOG_VERSION_NO 202104161
#endif
proname => 'pg_stat_get_replication_slots', prorows => '10',
proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
PgStat_Counter m_stream_txns;
PgStat_Counter m_stream_count;
PgStat_Counter m_stream_bytes;
+ PgStat_Counter m_total_txns;
+ PgStat_Counter m_total_bytes;
} PgStat_MsgReplSlot;
/* ----------
PgStat_Counter stream_txns;
PgStat_Counter stream_count;
PgStat_Counter stream_bytes;
+ PgStat_Counter total_txns;
+ PgStat_Counter total_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */
int64 streamBytes; /* amount of data streamed */
+
+ /*
+ * Statistics about all the transactions sent to the decoding output
+ * plugin
+ */
+ int64 totalTxns; /* total number of transactions sent */
+ int64 totalBytes; /* total amount of data sent */
};
s.stream_txns,
s.stream_count,
s.stream_bytes,
+ s.total_txns,
+ s.total_bytes,
s.stats_reset
- FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
+ FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,