diff options
| author | Amit Kapila | 2020-10-29 03:41:51 +0000 |
|---|---|---|
| committer | Amit Kapila | 2020-10-29 03:41:51 +0000 |
| commit | 8e90ec5580d5345fef31005d7cc2215ba2125070 (patch) | |
| tree | 0aeb74ce7a64214df6920964fa6ceea9f4e0086d /src/backend | |
| parent | 94bc27b57680b4e757577e3f5b65dc32f96d33c1 (diff) | |
Track statistics for streaming of changes from ReorderBuffer.
This adds the statistics about transactions streamed to the decoding
output plugin from ReorderBuffer. Users can query the
pg_stat_replication_slots view to check these stats and call
pg_stat_reset_replication_slot to reset the stats of a particular slot.
Users can pass NULL in pg_stat_reset_replication_slot to reset stats of
all the slots.
Commit 9868167500 has added the basic infrastructure to capture the stats
of slot and this commit extends the statistics collector to track
additional information about slots.
Bump the catversion as we have added new columns in the catalog entry.
Author: Ajin Cherian and Amit Kapila
Reviewed-by: Sawada Masahiko and Dilip Kumar
Discussion: https://postgr.es/m/CAA4eK1+chpEomLzgSoky-D31qev19AmECNiEAietPQUGEFhtVA@mail.gmail.com
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/catalog/system_views.sql | 3 | ||||
| -rw-r--r-- | src/backend/postmaster/pgstat.c | 11 | ||||
| -rw-r--r-- | src/backend/replication/logical/logical.c | 19 | ||||
| -rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 20 | ||||
| -rw-r--r-- | src/backend/replication/slot.c | 2 | ||||
| -rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 9 |
6 files changed, 53 insertions, 11 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c6dd084fbcc..5171ea05c7e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS s.spill_txns, s.spill_count, s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes, s.stats_reset FROM pg_stat_get_replication_slots() AS s; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 822f0ebc628..f1dca2f25b7 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize) */ void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes) + int spillbytes, int streamtxns, int streamcount, int streambytes) { PgStat_MsgReplSlot msg; @@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, msg.m_spill_txns = spilltxns; msg.m_spill_count = spillcount; msg.m_spill_bytes = spillbytes; + msg.m_stream_txns = streamtxns; + msg.m_stream_count = streamcount; + msg.m_stream_bytes = streambytes; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) replSlotStats[idx].spill_txns += msg->m_spill_txns; replSlotStats[idx].spill_count += msg->m_spill_count; replSlotStats[idx].spill_bytes += msg->m_spill_bytes; + replSlotStats[idx].stream_txns += msg->m_stream_txns; + replSlotStats[idx].stream_count += msg->m_stream_count; + replSlotStats[idx].stream_bytes += msg->m_stream_bytes; } } @@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts) replSlotStats[i].spill_txns = 0; replSlotStats[i].spill_count = 0; replSlotStats[i].spill_bytes = 0; + replSlotStats[i].stream_txns = 0; + replSlotStats[i].stream_count = 0; + replSlotStats[i].stream_bytes = 0; replSlotStats[i].stat_reset_timestamp = ts; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8675832f4d6..d5cfbeaa4af 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) ReorderBuffer *rb = ctx->reorder; /* - * Nothing to do if we haven't spilled anything since the last time the - * stats has been sent. + * Nothing to do if we haven't spilled or streamed anything since the last + * time the stats has been sent. */ - if (rb->spillBytes <= 0) + if (rb->spillBytes <= 0 && rb->streamBytes <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld", + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld", rb, (long long) rb->spillTxns, (long long) rb->spillCount, - (long long) rb->spillBytes); + (long long) rb->spillBytes, + (long long) rb->streamTxns, + (long long) rb->streamCount, + (long long) rb->streamBytes); pgstat_report_replslot(NameStr(ctx->slot->data.name), - rb->spillTxns, rb->spillCount, rb->spillBytes); + rb->spillTxns, rb->spillCount, rb->spillBytes, + rb->streamTxns, rb->streamCount, rb->streamBytes); rb->spillTxns = 0; rb->spillCount = 0; rb->spillBytes = 0; + rb->streamTxns = 0; + rb->streamCount = 0; + rb->streamBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 7a8bf760791..c1bd68011c5 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -346,6 +346,9 @@ ReorderBufferAllocate(void) buffer->spillTxns = 0; buffer->spillCount = 0; buffer->spillBytes = 0; + buffer->streamTxns = 0; + buffer->streamCount = 0; + buffer->streamBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { Snapshot snapshot_now; CommandId command_id; + Size stream_bytes; + bool txn_is_streamed; /* We can never reach here for a subtransaction. */ Assert(txn->toptxn == NULL); @@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->snapshot_now = NULL; } + /* + * Remember this information to be used later to update stats. We can't + * update the stats here as an error while processing the changes would + * lead to the accumulation of stats even though we haven't streamed all + * the changes. + */ + txn_is_streamed = rbtxn_is_streamed(txn); + stream_bytes = txn->total_size; + /* Process and send the changes to output plugin. */ ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); + rb->streamCount += 1; + rb->streamBytes += stream_bytes; + + /* Don't consider already streamed transaction. */ + rb->streamTxns += (txn_is_streamed) ? 0 : 1; + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 220b4cd6e99..09be1d8c485 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * ReplicationSlotAllocationLock. */ if (SlotIsLogical(slot)) - pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0); + pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0); /* * Now that the slot has been marked as in_use and active, it's safe to diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 472fa596e1f..a210fc93b41 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 5 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 8 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS) values[1] = Int64GetDatum(s->spill_txns); values[2] = Int64GetDatum(s->spill_count); values[3] = Int64GetDatum(s->spill_bytes); + values[4] = Int64GetDatum(s->stream_txns); + values[5] = Int64GetDatum(s->stream_count); + values[6] = Int64GetDatum(s->stream_bytes); if (s->stat_reset_timestamp == 0) - nulls[4] = true; + nulls[7] = true; else - values[4] = TimestampTzGetDatum(s->stat_reset_timestamp); + values[7] = TimestampTzGetDatum(s->stat_reset_timestamp); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } |
