summaryrefslogtreecommitdiff
path: root/src/backend/postmaster
diff options
context:
space:
mode:
authorAmit Kapila2020-10-08 03:39:08 +0000
committerAmit Kapila2020-10-08 03:39:08 +0000
commit98681675002d852d926a49d7bc4d4b4856b2fc4a (patch)
treee30f3bd90ae90d11b6a1ac4a7d705f6adfb6dd50 /src/backend/postmaster
parent8d2a01ae12cd657b33ffd50eace86a341636c586 (diff)
Track statistics for spilling of changes from ReorderBuffer.
This adds the statistics about transactions spilled to disk from ReorderBuffer. Users can query the pg_stat_replication_slots view to check these stats and call pg_stat_reset_replication_slot to reset the stats of a particular slot. Users can pass NULL in pg_stat_reset_replication_slot to reset stats of all the slots. This commit extends the statistics collector to track this information about slots. Author: Sawada Masahiko and Amit Kapila Reviewed-by: Amit Kapila and Dilip Kumar Discussion: https://postgr.es/m/CA+fd4k5_pPAYRTDrO2PbtTOe0eHQpBvuqmCr8ic39uTNmR49Eg@mail.gmail.com
Diffstat (limited to 'src/backend/postmaster')
-rw-r--r--src/backend/postmaster/pgstat.c320
1 files changed, 320 insertions, 0 deletions
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 5294c785494..822f0ebc628 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -51,6 +51,7 @@
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
+#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/backendid.h"
#include "storage/dsm.h"
@@ -284,6 +285,8 @@ static PgStat_ArchiverStats archiverStats;
static PgStat_GlobalStats globalStats;
static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
+static PgStat_ReplSlotStats *replSlotStats;
+static int nReplSlotStats;
/*
* List of OIDs of databases we need to write out. If an entry is InvalidOid,
@@ -324,6 +327,9 @@ static void pgstat_read_current_status(void);
static bool pgstat_write_statsfile_needed(void);
static bool pgstat_db_requested(Oid databaseid);
+static int pgstat_replslot_index(const char *name, bool create_it);
+static void pgstat_reset_replslot(int i, TimestampTz ts);
+
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
static void pgstat_send_funcstats(void);
static void pgstat_send_slru(void);
@@ -350,6 +356,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
+static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
@@ -362,6 +369,7 @@ static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);
+static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
/* ------------------------------------------------------------
@@ -1438,6 +1446,61 @@ pgstat_reset_slru_counter(const char *name)
}
/* ----------
+ * pgstat_reset_replslot_counter() -
+ *
+ * Tell the statistics collector to reset a single replication slot
+ * counter, or all replication slots counters (when name is null).
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ * ----------
+ */
+void
+pgstat_reset_replslot_counter(const char *name)
+{
+ PgStat_MsgResetreplslotcounter msg;
+
+ if (pgStatSock == PGINVALID_SOCKET)
+ return;
+
+ if (name)
+ {
+ ReplicationSlot *slot;
+
+ /*
+ * Check if the slot exits with the given name. It is possible that by
+ * the time this message is executed the slot is dropped but at least
+ * this check will ensure that the given name is for a valid slot.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ slot = SearchNamedReplicationSlot(name);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ if (!slot)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" does not exist",
+ name)));
+
+ /*
+ * Nothing to do for physical slots as we collect stats only for
+ * logical slots.
+ */
+ if (SlotIsPhysical(slot))
+ return;
+
+ memcpy(&msg.m_slotname, name, NAMEDATALEN);
+ msg.clearall = false;
+ }
+ else
+ msg.clearall = true;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
+
+ pgstat_send(&msg, sizeof(msg));
+}
+
+/* ----------
* pgstat_report_autovac() -
*
* Called from autovacuum.c to report startup of an autovacuum process.
@@ -1637,6 +1700,46 @@ pgstat_report_tempfile(size_t filesize)
pgstat_send(&msg, sizeof(msg));
}
+/* ----------
+ * pgstat_report_replslot() -
+ *
+ * Tell the collector about replication slot statistics.
+ * ----------
+ */
+void
+pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+ int spillbytes)
+{
+ PgStat_MsgReplSlot msg;
+
+ /*
+ * Prepare and send the message
+ */
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+ msg.m_drop = false;
+ msg.m_spill_txns = spilltxns;
+ msg.m_spill_count = spillcount;
+ msg.m_spill_bytes = spillbytes;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
+/* ----------
+ * pgstat_report_replslot_drop() -
+ *
+ * Tell the collector about dropping the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_drop(const char *slotname)
+{
+ PgStat_MsgReplSlot msg;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+ msg.m_drop = true;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
/* ----------
* pgstat_ping() -
@@ -2714,6 +2817,23 @@ pgstat_fetch_slru(void)
return slruStats;
}
+/*
+ * ---------
+ * pgstat_fetch_replslot() -
+ *
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * a pointer to the replication slot statistics struct and sets the
+ * number of entries in nslots_p.
+ * ---------
+ */
+PgStat_ReplSlotStats *
+pgstat_fetch_replslot(int *nslots_p)
+{
+ backend_read_statsfile();
+
+ *nslots_p = nReplSlotStats;
+ return replSlotStats;
+}
/* ------------------------------------------------------------
* Functions for management of the shared-memory PgBackendStatus array
@@ -4693,6 +4813,11 @@ PgstatCollectorMain(int argc, char *argv[])
len);
break;
+ case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
+ pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
+ len);
+ break;
+
case PGSTAT_MTYPE_AUTOVAC_START:
pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
break;
@@ -4747,6 +4872,10 @@ PgstatCollectorMain(int argc, char *argv[])
len);
break;
+ case PGSTAT_MTYPE_REPLSLOT:
+ pgstat_recv_replslot(&msg.msg_replslot, len);
+ break;
+
default:
break;
}
@@ -4946,6 +5075,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
+ int i;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
@@ -5026,6 +5156,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
}
/*
+ * Write replication slot stats struct
+ */
+ for (i = 0; i < nReplSlotStats; i++)
+ {
+ fputc('R', fpout);
+ rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+
+ /*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
* after each individual fputc or fwrite above.
@@ -5250,6 +5390,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* Allocate the space for replication slot statistics */
+ replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats));
+ nReplSlotStats = 0;
+
/*
* Clear out global, archiver, WAL and SLRU statistics so they start from
* zero in case we can't load an existing statsfile.
@@ -5274,6 +5418,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
/*
+ * Set the same reset timestamp for all replication slots too.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
+
+ /*
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
* with empty counters.
@@ -5447,6 +5597,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break;
+ /*
+ * 'R' A PgStat_ReplSlotStats struct describing a replication
+ * slot follows.
+ */
+ case 'R':
+ if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
+ != sizeof(PgStat_ReplSlotStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+ goto done;
+ }
+ nReplSlotStats++;
+ break;
+
case 'E':
goto done;
@@ -5658,6 +5825,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_ArchiverStats myArchiverStats;
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
+ PgStat_ReplSlotStats myReplSlotStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -5772,6 +5940,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
break;
+ /*
+ * 'R' A PgStat_ReplSlotStats struct describing a replication
+ * slot follows.
+ */
+ case 'R':
+ if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
+ != sizeof(PgStat_ReplSlotStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ FreeFile(fpin);
+ return false;
+ }
+ break;
+
case 'E':
goto done;
@@ -6368,6 +6552,46 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len)
}
/* ----------
+ * pgstat_recv_resetreplslotcounter() -
+ *
+ * Reset some replication slot statistics of the cluster.
+ * ----------
+ */
+static void
+pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
+ int len)
+{
+ int i;
+ int idx = -1;
+ TimestampTz ts;
+
+ ts = GetCurrentTimestamp();
+ if (msg->clearall)
+ {
+ for (i = 0; i < nReplSlotStats; i++)
+ pgstat_reset_replslot(i, ts);
+ }
+ else
+ {
+ /* Get the index of replication slot statistics to reset */
+ idx = pgstat_replslot_index(msg->m_slotname, false);
+
+ /*
+ * Nothing to do if the given slot entry is not found. This could
+ * happen when the slot with the given name is removed and the
+ * corresponding statistics entry is also removed before receiving the
+ * reset message.
+ */
+ if (idx < 0)
+ return;
+
+ /* Reset the stats for the requested replication slot */
+ pgstat_reset_replslot(idx, ts);
+ }
+}
+
+
+/* ----------
* pgstat_recv_autovac() -
*
* Process an autovacuum signaling message.
@@ -6627,6 +6851,51 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
}
/* ----------
+ * pgstat_recv_replslot() -
+ *
+ * Process a REPLSLOT message.
+ * ----------
+ */
+static void
+pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
+{
+ int idx;
+
+ /*
+ * Get the index of replication slot statistics. On dropping, we don't
+ * create the new statistics.
+ */
+ idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
+
+ /*
+ * The slot entry is not found or there is no space to accommodate the new
+ * entry. This could happen when the message for the creation of a slot
+ * reached before the drop message even though the actual operations
+ * happen in reverse order. In such a case, the next update of the
+ * statistics for the same slot will create the required entry.
+ */
+ if (idx < 0)
+ return;
+
+ Assert(idx >= 0 && idx <= max_replication_slots);
+ if (msg->m_drop)
+ {
+ /* Remove the replication slot statistics with the given name */
+ memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
+ sizeof(PgStat_ReplSlotStats));
+ nReplSlotStats--;
+ Assert(nReplSlotStats >= 0);
+ }
+ else
+ {
+ /* Update the replication slot statistics */
+ replSlotStats[idx].spill_txns += msg->m_spill_txns;
+ replSlotStats[idx].spill_count += msg->m_spill_count;
+ replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ }
+}
+
+/* ----------
* pgstat_recv_tempfile() -
*
* Process a TEMPFILE message.
@@ -6808,6 +7077,57 @@ pgstat_clip_activity(const char *raw_activity)
return activity;
}
+/* ----------
+ * pgstat_replslot_index
+ *
+ * Return the index of entry of a replication slot with the given name, or
+ * -1 if the slot is not found.
+ *
+ * create_it tells whether to create the new slot entry if it is not found.
+ * ----------
+ */
+static int
+pgstat_replslot_index(const char *name, bool create_it)
+{
+ int i;
+
+ Assert(nReplSlotStats <= max_replication_slots);
+ for (i = 0; i < nReplSlotStats; i++)
+ {
+ if (strcmp(replSlotStats[i].slotname, name) == 0)
+ return i; /* found */
+ }
+
+ /*
+ * The slot is not found. We don't want to register the new statistics if
+ * the list is already full or the caller didn't request.
+ */
+ if (i == max_replication_slots || !create_it)
+ return -1;
+
+ /* Register new slot */
+ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+ memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+
+ return nReplSlotStats++;
+}
+
+/* ----------
+ * pgstat_reset_replslot
+ *
+ * Reset the replication slot stats at index 'i'.
+ * ----------
+ */
+static void
+pgstat_reset_replslot(int i, TimestampTz ts)
+{
+ /* reset only counters. Don't clear slot name */
+ replSlotStats[i].spill_txns = 0;
+ replSlotStats[i].spill_count = 0;
+ replSlotStats[i].spill_bytes = 0;
+ replSlotStats[i].stat_reset_timestamp = ts;
+}
+
/*
* pgstat_slru_index
*