diff options
| author | Amit Kapila | 2020-10-08 03:39:08 +0000 |
|---|---|---|
| committer | Amit Kapila | 2020-10-08 03:39:08 +0000 |
| commit | 98681675002d852d926a49d7bc4d4b4856b2fc4a (patch) | |
| tree | e30f3bd90ae90d11b6a1ac4a7d705f6adfb6dd50 /src/backend/postmaster | |
| parent | 8d2a01ae12cd657b33ffd50eace86a341636c586 (diff) | |
Track statistics for spilling of changes from ReorderBuffer.
This adds the statistics about transactions spilled to disk from
ReorderBuffer. Users can query the pg_stat_replication_slots view to check
these stats and call pg_stat_reset_replication_slot to reset the stats of
a particular slot. Users can pass NULL in pg_stat_reset_replication_slot
to reset stats of all the slots.
This commit extends the statistics collector to track this information
about slots.
Author: Sawada Masahiko and Amit Kapila
Reviewed-by: Amit Kapila and Dilip Kumar
Discussion: https://postgr.es/m/CA+fd4k5_pPAYRTDrO2PbtTOe0eHQpBvuqmCr8ic39uTNmR49Eg@mail.gmail.com
Diffstat (limited to 'src/backend/postmaster')
| -rw-r--r-- | src/backend/postmaster/pgstat.c | 320 |
1 files changed, 320 insertions, 0 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 5294c785494..822f0ebc628 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -51,6 +51,7 @@ #include "postmaster/fork_process.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/backendid.h" #include "storage/dsm.h" @@ -284,6 +285,8 @@ static PgStat_ArchiverStats archiverStats; static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; +static PgStat_ReplSlotStats *replSlotStats; +static int nReplSlotStats; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -324,6 +327,9 @@ static void pgstat_read_current_status(void); static bool pgstat_write_statsfile_needed(void); static bool pgstat_db_requested(Oid databaseid); +static int pgstat_replslot_index(const char *name, bool create_it); +static void pgstat_reset_replslot(int i, TimestampTz ts); + static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); @@ -350,6 +356,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len); static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len); static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); +static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len); static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); @@ -362,6 +369,7 @@ static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len); static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len); static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len); static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len); +static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); /* ------------------------------------------------------------ @@ -1438,6 +1446,61 @@ pgstat_reset_slru_counter(const char *name) } /* ---------- + * pgstat_reset_replslot_counter() - + * + * Tell the statistics collector to reset a single replication slot + * counter, or all replication slots counters (when name is null). + * + * Permission checking for this function is managed through the normal + * GRANT system. + * ---------- + */ +void +pgstat_reset_replslot_counter(const char *name) +{ + PgStat_MsgResetreplslotcounter msg; + + if (pgStatSock == PGINVALID_SOCKET) + return; + + if (name) + { + ReplicationSlot *slot; + + /* + * Check if the slot exits with the given name. It is possible that by + * the time this message is executed the slot is dropped but at least + * this check will ensure that the given name is for a valid slot. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + slot = SearchNamedReplicationSlot(name); + LWLockRelease(ReplicationSlotControlLock); + + if (!slot) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" does not exist", + name))); + + /* + * Nothing to do for physical slots as we collect stats only for + * logical slots. + */ + if (SlotIsPhysical(slot)) + return; + + memcpy(&msg.m_slotname, name, NAMEDATALEN); + msg.clearall = false; + } + else + msg.clearall = true; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER); + + pgstat_send(&msg, sizeof(msg)); +} + +/* ---------- * pgstat_report_autovac() - * * Called from autovacuum.c to report startup of an autovacuum process. @@ -1637,6 +1700,46 @@ pgstat_report_tempfile(size_t filesize) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_report_replslot() - + * + * Tell the collector about replication slot statistics. + * ---------- + */ +void +pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, + int spillbytes) +{ + PgStat_MsgReplSlot msg; + + /* + * Prepare and send the message + */ + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + memcpy(&msg.m_slotname, slotname, NAMEDATALEN); + msg.m_drop = false; + msg.m_spill_txns = spilltxns; + msg.m_spill_count = spillcount; + msg.m_spill_bytes = spillbytes; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} + +/* ---------- + * pgstat_report_replslot_drop() - + * + * Tell the collector about dropping the replication slot. + * ---------- + */ +void +pgstat_report_replslot_drop(const char *slotname) +{ + PgStat_MsgReplSlot msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); + memcpy(&msg.m_slotname, slotname, NAMEDATALEN); + msg.m_drop = true; + pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); +} /* ---------- * pgstat_ping() - @@ -2714,6 +2817,23 @@ pgstat_fetch_slru(void) return slruStats; } +/* + * --------- + * pgstat_fetch_replslot() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the replication slot statistics struct and sets the + * number of entries in nslots_p. + * --------- + */ +PgStat_ReplSlotStats * +pgstat_fetch_replslot(int *nslots_p) +{ + backend_read_statsfile(); + + *nslots_p = nReplSlotStats; + return replSlotStats; +} /* ------------------------------------------------------------ * Functions for management of the shared-memory PgBackendStatus array @@ -4693,6 +4813,11 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER: + pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter, + len); + break; + case PGSTAT_MTYPE_AUTOVAC_START: pgstat_recv_autovac(&msg.msg_autovacuum_start, len); break; @@ -4747,6 +4872,10 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_REPLSLOT: + pgstat_recv_replslot(&msg.msg_replslot, len); + break; + default: break; } @@ -4946,6 +5075,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; int rc; + int i; elog(DEBUG2, "writing stats file \"%s\"", statfile); @@ -5026,6 +5156,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) } /* + * Write replication slot stats struct + */ + for (i = 0; i < nReplSlotStats; i++) + { + fputc('R', fpout); + rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error * after each individual fputc or fwrite above. @@ -5250,6 +5390,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* Allocate the space for replication slot statistics */ + replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats)); + nReplSlotStats = 0; + /* * Clear out global, archiver, WAL and SLRU statistics so they start from * zero in case we can't load an existing statsfile. @@ -5274,6 +5418,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; /* + * Set the same reset timestamp for all replication slots too. + */ + for (i = 0; i < max_replication_slots; i++) + replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp; + + /* * Try to open the stats file. If it doesn't exist, the backends simply * return zero for anything and the collector simply starts from scratch * with empty counters. @@ -5447,6 +5597,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; + /* + * 'R' A PgStat_ReplSlotStats struct describing a replication + * slot follows. + */ + case 'R': + if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) + != sizeof(PgStat_ReplSlotStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); + goto done; + } + nReplSlotStats++; + break; + case 'E': goto done; @@ -5658,6 +5825,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_ArchiverStats myArchiverStats; PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; + PgStat_ReplSlotStats myReplSlotStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -5772,6 +5940,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, break; + /* + * 'R' A PgStat_ReplSlotStats struct describing a replication + * slot follows. + */ + case 'R': + if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin) + != sizeof(PgStat_ReplSlotStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + FreeFile(fpin); + return false; + } + break; + case 'E': goto done; @@ -6368,6 +6552,46 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len) } /* ---------- + * pgstat_recv_resetreplslotcounter() - + * + * Reset some replication slot statistics of the cluster. + * ---------- + */ +static void +pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, + int len) +{ + int i; + int idx = -1; + TimestampTz ts; + + ts = GetCurrentTimestamp(); + if (msg->clearall) + { + for (i = 0; i < nReplSlotStats; i++) + pgstat_reset_replslot(i, ts); + } + else + { + /* Get the index of replication slot statistics to reset */ + idx = pgstat_replslot_index(msg->m_slotname, false); + + /* + * Nothing to do if the given slot entry is not found. This could + * happen when the slot with the given name is removed and the + * corresponding statistics entry is also removed before receiving the + * reset message. + */ + if (idx < 0) + return; + + /* Reset the stats for the requested replication slot */ + pgstat_reset_replslot(idx, ts); + } +} + + +/* ---------- * pgstat_recv_autovac() - * * Process an autovacuum signaling message. @@ -6627,6 +6851,51 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len) } /* ---------- + * pgstat_recv_replslot() - + * + * Process a REPLSLOT message. + * ---------- + */ +static void +pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) +{ + int idx; + + /* + * Get the index of replication slot statistics. On dropping, we don't + * create the new statistics. + */ + idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop); + + /* + * The slot entry is not found or there is no space to accommodate the new + * entry. This could happen when the message for the creation of a slot + * reached before the drop message even though the actual operations + * happen in reverse order. In such a case, the next update of the + * statistics for the same slot will create the required entry. + */ + if (idx < 0) + return; + + Assert(idx >= 0 && idx <= max_replication_slots); + if (msg->m_drop) + { + /* Remove the replication slot statistics with the given name */ + memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1], + sizeof(PgStat_ReplSlotStats)); + nReplSlotStats--; + Assert(nReplSlotStats >= 0); + } + else + { + /* Update the replication slot statistics */ + replSlotStats[idx].spill_txns += msg->m_spill_txns; + replSlotStats[idx].spill_count += msg->m_spill_count; + replSlotStats[idx].spill_bytes += msg->m_spill_bytes; + } +} + +/* ---------- * pgstat_recv_tempfile() - * * Process a TEMPFILE message. @@ -6808,6 +7077,57 @@ pgstat_clip_activity(const char *raw_activity) return activity; } +/* ---------- + * pgstat_replslot_index + * + * Return the index of entry of a replication slot with the given name, or + * -1 if the slot is not found. + * + * create_it tells whether to create the new slot entry if it is not found. + * ---------- + */ +static int +pgstat_replslot_index(const char *name, bool create_it) +{ + int i; + + Assert(nReplSlotStats <= max_replication_slots); + for (i = 0; i < nReplSlotStats; i++) + { + if (strcmp(replSlotStats[i].slotname, name) == 0) + return i; /* found */ + } + + /* + * The slot is not found. We don't want to register the new statistics if + * the list is already full or the caller didn't request. + */ + if (i == max_replication_slots || !create_it) + return -1; + + /* Register new slot */ + memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); + memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN); + + return nReplSlotStats++; +} + +/* ---------- + * pgstat_reset_replslot + * + * Reset the replication slot stats at index 'i'. + * ---------- + */ +static void +pgstat_reset_replslot(int i, TimestampTz ts) +{ + /* reset only counters. Don't clear slot name */ + replSlotStats[i].spill_txns = 0; + replSlotStats[i].spill_count = 0; + replSlotStats[i].spill_bytes = 0; + replSlotStats[i].stat_reset_timestamp = ts; +} + /* * pgstat_slru_index * |
