diff options
| author | Amit Kapila | 2021-11-30 03:24:30 +0000 |
|---|---|---|
| committer | Amit Kapila | 2021-11-30 03:24:30 +0000 |
| commit | 8d74fc96db5fd547e077bf9bf4c3b67f821d71cd (patch) | |
| tree | 3037345a7edabd025edcc5d9b431fb14f780e817 /src/backend | |
| parent | 98105e53e0ab472b7721a3e8d7b9f1750a635120 (diff) | |
Add a view to show the stats of subscription workers.
This commit adds a new system view pg_stat_subscription_workers, that
shows information about any errors which occur during the application of
logical replication changes as well as during performing initial table
synchronization. The subscription statistics entries are removed when the
corresponding subscription is removed.
It also adds an SQL function pg_stat_reset_subscription_worker() to reset
single subscription errors.
The contents of this view can be used by an upcoming patch that skips the
particular transaction that conflicts with the existing data on the
subscriber.
This view can be extended in the future to track other xact related
statistics like the number of xacts committed/aborted for subscription
workers.
Author: Masahiko Sawada
Reviewed-by: Greg Nancarrow, Hou Zhijie, Tang Haiying, Vignesh C, Dilip Kumar, Takamichi Osumi, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDeScrsHhLyEPYqN3sydg6PxAPVBboK=30xJfUVihNZDA@mail.gmail.com
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/catalog/system_functions.sql | 4 | ||||
| -rw-r--r-- | src/backend/catalog/system_views.sql | 23 | ||||
| -rw-r--r-- | src/backend/commands/subscriptioncmds.c | 16 | ||||
| -rw-r--r-- | src/backend/postmaster/pgstat.c | 409 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 54 | ||||
| -rw-r--r-- | src/backend/utils/adt/pgstatfuncs.c | 128 |
6 files changed, 613 insertions, 21 deletions
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index f6789025a5f..3a4fa9091b1 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -639,6 +639,10 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public; +REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid) FROM public; + +REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid, oid) FROM public; + REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public; REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index eb560955cda..61b515cdb85 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1261,3 +1261,26 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; + +CREATE VIEW pg_stat_subscription_workers AS + SELECT + w.subid, + s.subname, + w.subrelid, + w.last_error_relid, + w.last_error_command, + w.last_error_xid, + w.last_error_count, + w.last_error_message, + w.last_error_time + FROM (SELECT + oid as subid, + NULL as relid + FROM pg_subscription + UNION ALL + SELECT + srsubid as subid, + srrelid as relid + FROM pg_subscription_rel) sr, + LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w + JOIN pg_subscription s ON (w.subid = s.oid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index c47ba263695..9427e86fee1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -32,6 +32,7 @@ #include "executor/executor.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "pgstat.h" #include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/slot.h" @@ -1204,7 +1205,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * Since dropping a replication slot is not transactional, the replication * slot stays dropped even if the transaction rolls back. So we cannot * run DROP SUBSCRIPTION inside a transaction block if dropping the - * replication slot. + * replication slot. Also, in this case, we report a message for dropping + * the subscription to the stats collector. * * XXX The command name should really be something like "DROP SUBSCRIPTION * of a subscription that is associated with a replication slot", but we @@ -1377,6 +1379,18 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } PG_END_TRY(); + /* + * Send a message for dropping this subscription to the stats collector. + * We can safely report dropping the subscription statistics here if the + * subscription is associated with a replication slot since we cannot run + * DROP SUBSCRIPTION inside a transaction block. Subscription statistics + * will be removed later by (auto)vacuum either if it's not associated + * with a replication slot or if the message for dropping the subscription + * gets lost. + */ + if (slotname) + pgstat_report_subscription_drop(subid); + table_close(rel, NoLock); } diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 8c166e5e161..7264d2c7272 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -41,6 +41,7 @@ #include "catalog/catalog.h" #include "catalog/pg_database.h" #include "catalog/pg_proc.h" +#include "catalog/pg_subscription.h" #include "common/ip.h" #include "executor/instrument.h" #include "libpq/libpq.h" @@ -105,6 +106,7 @@ #define PGSTAT_DB_HASH_SIZE 16 #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 +#define PGSTAT_SUBWORKER_HASH_SIZE 32 #define PGSTAT_REPLSLOT_HASH_SIZE 32 @@ -320,10 +322,14 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create); static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create); +static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, + Oid subid, Oid subrelid, + bool create); static void pgstat_write_statsfiles(bool permanent, bool allDbs); static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent); static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep); -static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent); +static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, + HTAB *subworkerhash, bool permanent); static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); @@ -335,6 +341,7 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); +static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg); static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid); static bool pgstat_should_report_connstat(void); static void pgstat_report_disconnect(Oid dboid); @@ -373,6 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len); static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len); static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len); +static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1302,6 +1311,74 @@ pgstat_vacuum_stat(void) hash_destroy(htab); } + + /* + * Repeat for subscription workers. Similarly, we needn't bother in the + * common case where no subscription workers' stats are being collected. + */ + if (dbentry->subworkers != NULL && + hash_get_num_entries(dbentry->subworkers) > 0) + { + PgStat_StatSubWorkerEntry *subwentry; + PgStat_MsgSubscriptionPurge spmsg; + + /* + * Read pg_subscription and make a list of OIDs of all existing + * subscriptions + */ + htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); + + spmsg.m_databaseid = MyDatabaseId; + spmsg.m_nentries = 0; + + hash_seq_init(&hstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) + { + bool exists = false; + Oid subid = subwentry->key.subid; + + CHECK_FOR_INTERRUPTS(); + + if (hash_search(htab, (void *) &subid, HASH_FIND, NULL) != NULL) + continue; + + /* + * It is possible that we have multiple entries for the + * subscription corresponding to apply worker and tablesync + * workers. In such cases, we don't need to add the same subid + * again. + */ + for (int i = 0; i < spmsg.m_nentries; i++) + { + if (spmsg.m_subids[i] == subid) + { + exists = true; + break; + } + } + + if (exists) + continue; + + /* This subscription is dead, add the subid to the message */ + spmsg.m_subids[spmsg.m_nentries++] = subid; + + /* + * If the message is full, send it out and reinitialize to empty + */ + if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE) + { + pgstat_send_subscription_purge(&spmsg); + spmsg.m_nentries = 0; + } + } + + /* Send the rest of dead subscriptions */ + if (spmsg.m_nentries > 0) + pgstat_send_subscription_purge(&spmsg); + + hash_destroy(htab); + } } @@ -1474,7 +1551,8 @@ pgstat_reset_shared_counters(const char *target) * ---------- */ void -pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) +pgstat_reset_single_counter(Oid objoid, Oid subobjoid, + PgStat_Single_Reset_Type type) { PgStat_MsgResetsinglecounter msg; @@ -1485,6 +1563,7 @@ pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) msg.m_databaseid = MyDatabaseId; msg.m_resettype = type; msg.m_objectid = objoid; + msg.m_subobjectid = subobjoid; pgstat_send(&msg, sizeof(msg)); } @@ -1870,6 +1949,51 @@ pgstat_report_replslot_drop(const char *slotname) } /* ---------- + * pgstat_report_subworker_error() - + * + * Tell the collector about the subscription worker error. + * ---------- + */ +void +pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, TransactionId xid, + const char *errmsg) +{ + PgStat_MsgSubWorkerError msg; + int len; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR); + msg.m_databaseid = MyDatabaseId; + msg.m_subid = subid; + msg.m_subrelid = subrelid; + msg.m_relid = relid; + msg.m_command = command; + msg.m_xid = xid; + msg.m_timestamp = GetCurrentTimestamp(); + strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN); + + len = offsetof(PgStat_MsgSubWorkerError, m_message) + strlen(msg.m_message) + 1; + pgstat_send(&msg, len); +} + +/* ---------- + * pgstat_report_subscription_drop() - + * + * Tell the collector about dropping the subscription. + * ---------- + */ +void +pgstat_report_subscription_drop(Oid subid) +{ + PgStat_MsgSubscriptionPurge msg; + + msg.m_databaseid = MyDatabaseId; + msg.m_subids[0] = subid; + msg.m_nentries = 1; + pgstat_send_subscription_purge(&msg); +} + +/* ---------- * pgstat_ping() - * * Send some junk data to the collector to increase traffic. @@ -2874,6 +2998,35 @@ pgstat_fetch_stat_funcentry(Oid func_id) return funcentry; } +/* + * --------- + * pgstat_fetch_stat_subworker_entry() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for subscription worker or NULL. + * --------- + */ +PgStat_StatSubWorkerEntry * +pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid) +{ + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *wentry = NULL; + + /* Load the stats file if needed */ + backend_read_statsfile(); + + /* + * Lookup our database, then find the requested subscription worker stats. + */ + dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId); + if (dbentry != NULL && dbentry->subworkers != NULL) + { + wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid, + false); + } + + return wentry; +} /* * --------- @@ -3312,6 +3465,23 @@ pgstat_send_slru(void) } } +/* -------- + * pgstat_send_subscription_purge() - + * + * Send a subscription purge message to the collector + * -------- + */ +static void +pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg) +{ + int len; + + len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0]) + + msg->m_nentries * sizeof(Oid); + + pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE); + pgstat_send(msg, len); +} /* ---------- * PgstatCollectorMain() - @@ -3568,6 +3738,14 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_disconnect(&msg.msg_disconnect, len); break; + case PGSTAT_MTYPE_SUBSCRIPTIONPURGE: + pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len); + break; + + case PGSTAT_MTYPE_SUBWORKERERROR: + pgstat_recv_subworker_error(&msg.msg_subworkererror, len); + break; + default: break; } @@ -3613,7 +3791,8 @@ PgstatCollectorMain(int argc, char *argv[]) /* * Subroutine to clear stats in a database entry * - * Tables and functions hashes are initialized to empty. + * Tables, functions, and subscription workers hashes are initialized + * to empty. */ static void reset_dbentry_counters(PgStat_StatDBEntry *dbentry) @@ -3666,6 +3845,13 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) PGSTAT_FUNCTION_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS); + + hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); + hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); + dbentry->subworkers = hash_create("Per-database subscription worker", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); } /* @@ -3690,7 +3876,7 @@ pgstat_get_db_entry(Oid databaseid, bool create) /* * If not found, initialize the new one. This creates empty hash tables - * for tables and functions, too. + * for tables, functions, and subscription workers, too. */ if (!found) reset_dbentry_counters(result); @@ -3748,6 +3934,47 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create) return result; } +/* ---------- + * pgstat_get_subworker_entry + * + * Return subscription worker entry with the given subscription OID and + * relation OID. If subrelid is InvalidOid, it returns an entry of the + * apply worker otherwise returns an entry of the table sync worker + * associated with subrelid. If no subscription worker entry exists, + * initialize it, if the create parameter is true. Else, return NULL. + * ---------- + */ +static PgStat_StatSubWorkerEntry * +pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid, + bool create) +{ + PgStat_StatSubWorkerEntry *subwentry; + PgStat_StatSubWorkerKey key; + bool found; + HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + + key.subid = subid; + key.subrelid = subrelid; + subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers, + (void *) &key, + action, &found); + + if (!create && !found) + return NULL; + + /* If not found, initialize the new one */ + if (!found) + { + subwentry->last_error_relid = InvalidOid; + subwentry->last_error_command = 0; + subwentry->last_error_xid = InvalidTransactionId; + subwentry->last_error_count = 0; + subwentry->last_error_time = 0; + subwentry->last_error_message[0] = '\0'; + } + + return subwentry; +} /* ---------- * pgstat_write_statsfiles() - @@ -3832,8 +4059,8 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL) { /* - * Write out the table and function stats for this DB into the - * appropriate per-DB stat file, if required. + * Write out the table, function, and subscription-worker stats for + * this DB into the appropriate per-DB stat file, if required. */ if (allDbs || pgstat_db_requested(dbentry->databaseid)) { @@ -3947,8 +4174,10 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) { HASH_SEQ_STATUS tstat; HASH_SEQ_STATUS fstat; + HASH_SEQ_STATUS sstat; PgStat_StatTabEntry *tabentry; PgStat_StatFuncEntry *funcentry; + PgStat_StatSubWorkerEntry *subwentry; FILE *fpout; int32 format_id; Oid dbid = dbentry->databaseid; @@ -4004,6 +4233,17 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) } /* + * Walk through the database's subscription worker stats table. + */ + hash_seq_init(&sstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL) + { + fputc('S', fpout); + rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error * after each individual fputc or fwrite above. @@ -4061,8 +4301,9 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) * files after reading; the in-memory status is now authoritative, and the * files would be out of date in case somebody else reads them. * - * If a 'deep' read is requested, table/function stats are read, otherwise - * the table/function hash tables remain empty. + * If a 'deep' read is requested, table/function/subscription-worker stats are + * read, otherwise the table/function/subscription-worker hash tables remain + * empty. * ---------- */ static HTAB * @@ -4241,6 +4482,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry)); dbentry->tables = NULL; dbentry->functions = NULL; + dbentry->subworkers = NULL; /* * In the collector, disregard the timestamp we read from the @@ -4252,8 +4494,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbentry->stats_timestamp = 0; /* - * Don't create tables/functions hashtables for uninteresting - * databases. + * Don't create tables/functions/subworkers hashtables for + * uninteresting databases. */ if (onlydb != InvalidOid) { @@ -4278,6 +4520,14 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); + hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); + hash_ctl.hcxt = pgStatLocalContext; + dbentry->subworkers = hash_create("Per-database subscription worker", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* * If requested, read the data from the database-specific * file. Otherwise we just leave the hashtables empty. @@ -4286,6 +4536,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) pgstat_read_db_statsfile(dbentry->databaseid, dbentry->tables, dbentry->functions, + dbentry->subworkers, permanent); break; @@ -4363,19 +4614,21 @@ done: * As in pgstat_read_statsfiles, if the permanent file is requested, it is * removed after reading. * - * Note: this code has the ability to skip storing per-table or per-function - * data, if NULL is passed for the corresponding hashtable. That's not used - * at the moment though. + * Note: this code has the ability to skip storing per-table, per-function, or + * per-subscription-worker data, if NULL is passed for the corresponding hashtable. + * That's not used at the moment though. * ---------- */ static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - bool permanent) + HTAB *subworkerhash, bool permanent) { PgStat_StatTabEntry *tabentry; PgStat_StatTabEntry tabbuf; PgStat_StatFuncEntry funcbuf; PgStat_StatFuncEntry *funcentry; + PgStat_StatSubWorkerEntry subwbuf; + PgStat_StatSubWorkerEntry *subwentry; FILE *fpin; int32 format_id; bool found; @@ -4490,6 +4743,41 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, break; /* + * 'S' A PgStat_StatSubWorkerEntry struct describing + * subscription worker statistics. + */ + case 'S': + if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry), + fpin) != sizeof(PgStat_StatSubWorkerEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* + * Skip if subscription worker data not wanted. + */ + if (subworkerhash == NULL) + break; + + subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash, + (void *) &subwbuf.key, + HASH_ENTER, &found); + + if (found) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + memcpy(subwentry, &subwbuf, sizeof(subwbuf)); + break; + + /* * 'E' The EOF marker of a complete stats file. */ case 'E': @@ -5162,6 +5450,8 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); + if (dbentry->subworkers != NULL) + hash_destroy(dbentry->subworkers); if (hash_search(pgStatDBHash, (void *) &dbid, @@ -5199,13 +5489,16 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); + if (dbentry->subworkers != NULL) + hash_destroy(dbentry->subworkers); dbentry->tables = NULL; dbentry->functions = NULL; + dbentry->subworkers = NULL; /* * Reset database-level stats, too. This creates empty hash tables for - * tables and functions. + * tables, functions, and subscription workers. */ reset_dbentry_counters(dbentry); } @@ -5274,6 +5567,14 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len) else if (msg->m_resettype == RESET_FUNCTION) (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid), HASH_REMOVE, NULL); + else if (msg->m_resettype == RESET_SUBWORKER) + { + PgStat_StatSubWorkerKey key; + + key.subid = msg->m_objectid; + key.subrelid = msg->m_subobjectid; + (void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL); + } } /* ---------- @@ -5817,6 +6118,84 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) } /* ---------- + * pgstat_recv_subscription_purge() - + * + * Process a SUBSCRIPTIONPURGE message. + * ---------- + */ +static void +pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len) +{ + HASH_SEQ_STATUS hstat; + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *subwentry; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, false); + + /* No need to purge if we don't even know the database */ + if (!dbentry || !dbentry->subworkers) + return; + + /* Remove all subscription worker statistics for the given subscriptions */ + hash_seq_init(&hstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) + { + for (int i = 0; i < msg->m_nentries; i++) + { + if (subwentry->key.subid == msg->m_subids[i]) + { + (void) hash_search(dbentry->subworkers, (void *) &(subwentry->key), + HASH_REMOVE, NULL); + break; + } + } + } +} + +/* ---------- + * pgstat_recv_subworker_error() - + * + * Process a SUBWORKERERROR message. + * ---------- + */ +static void +pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) +{ + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *subwentry; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + + /* Get the subscription worker stats */ + subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid, + msg->m_subrelid, true); + Assert(subwentry); + + if (subwentry->last_error_relid == msg->m_relid && + subwentry->last_error_command == msg->m_command && + subwentry->last_error_xid == msg->m_xid && + strcmp(subwentry->last_error_message, msg->m_message) == 0) + { + /* + * The same error occurred again in succession, just update its + * timestamp and count. + */ + subwentry->last_error_count++; + subwentry->last_error_time = msg->m_timestamp; + return; + } + + /* Otherwise, update the error information */ + subwentry->last_error_relid = msg->m_relid; + subwentry->last_error_command = msg->m_command; + subwentry->last_error_xid = msg->m_xid; + subwentry->last_error_count = 1; + subwentry->last_error_time = msg->m_timestamp; + strlcpy(subwentry->last_error_message, msg->m_message, + PGSTAT_SUBWORKERERROR_MSGLEN); +} + +/* ---------- * pgstat_write_statsfile_needed() - * * Do we need to write out any stats files? diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ae1b391bdae..2e79302a48a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3332,6 +3332,7 @@ void ApplyWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); + MemoryContext cctx = CurrentMemoryContext; MemoryContext oldctx; char originname[NAMEDATALEN]; XLogRecPtr origin_startpos; @@ -3432,8 +3433,30 @@ ApplyWorkerMain(Datum main_arg) { char *syncslotname; - /* This is table synchronization worker, call initial sync. */ - syncslotname = LogicalRepSyncTableStart(&origin_startpos); + PG_TRY(); + { + /* This is table synchronization worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + } + PG_CATCH(); + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + /* + * Report the table sync error. There is no corresponding message + * type for table synchronization. + */ + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relid, + 0, /* message type */ + InvalidTransactionId, + errdata->message); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + PG_END_TRY(); /* allocate slot name in long-lived context */ myslotname = MemoryContextStrdup(ApplyContext, syncslotname); @@ -3551,7 +3574,32 @@ ApplyWorkerMain(Datum main_arg) } /* Run the main loop. */ - LogicalRepApplyLoop(origin_startpos); + PG_TRY(); + { + LogicalRepApplyLoop(origin_startpos); + } + PG_CATCH(); + { + /* report the apply error */ + if (apply_error_callback_arg.command != 0) + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + apply_error_callback_arg.rel != NULL + ? apply_error_callback_arg.rel->localreloid + : InvalidOid, + apply_error_callback_arg.command, + apply_error_callback_arg.remote_xid, + errdata->message); + MemoryContextSwitchTo(ecxt); + } + + PG_RE_THROW(); + } + PG_END_TRY(); proc_exit(0); } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index e64857e5409..f529c1561ab 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2172,7 +2172,7 @@ pg_stat_reset_single_table_counters(PG_FUNCTION_ARGS) { Oid taboid = PG_GETARG_OID(0); - pgstat_reset_single_counter(taboid, RESET_TABLE); + pgstat_reset_single_counter(taboid, InvalidOid, RESET_TABLE); PG_RETURN_VOID(); } @@ -2182,11 +2182,38 @@ pg_stat_reset_single_function_counters(PG_FUNCTION_ARGS) { Oid funcoid = PG_GETARG_OID(0); - pgstat_reset_single_counter(funcoid, RESET_FUNCTION); + pgstat_reset_single_counter(funcoid, InvalidOid, RESET_FUNCTION); PG_RETURN_VOID(); } +Datum +pg_stat_reset_subscription_worker_subrel(PG_FUNCTION_ARGS) +{ + Oid subid = PG_GETARG_OID(0); + Oid relid = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); + + pgstat_reset_single_counter(subid, relid, RESET_SUBWORKER); + + PG_RETURN_VOID(); +} + +/* Reset all subscription worker stats associated with the given subscription */ +Datum +pg_stat_reset_subscription_worker_sub(PG_FUNCTION_ARGS) +{ + Oid subid = PG_GETARG_OID(0); + + /* + * Use subscription drop message to remove statistics of all subscription + * workers. + */ + pgstat_report_subscription_drop(subid); + + PG_RETURN_VOID(); +} + + /* Reset SLRU counters (a specific one or all of them). */ Datum pg_stat_reset_slru(PG_FUNCTION_ARGS) @@ -2380,3 +2407,100 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +/* + * Get the subscription worker statistics for the given subscription + * (and relation). + */ +Datum +pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 8 + Oid subid = PG_GETARG_OID(0); + Oid subrelid; + TupleDesc tupdesc; + Datum values[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS]; + bool nulls[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS]; + PgStat_StatSubWorkerEntry *wentry; + int i; + + if (PG_ARGISNULL(1)) + subrelid = InvalidOid; + else + subrelid = PG_GETARG_OID(1); + + /* Get subscription worker stats */ + wentry = pgstat_fetch_stat_subworker_entry(subid, subrelid); + + /* Return NULL if there is no worker statistics */ + if (wentry == NULL) + PG_RETURN_NULL(); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_WORKER_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); + + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + i = 0; + /* subid */ + values[i++] = ObjectIdGetDatum(subid); + + /* subrelid */ + if (OidIsValid(subrelid)) + values[i++] = ObjectIdGetDatum(subrelid); + else + nulls[i++] = true; + + /* last_error_relid */ + if (OidIsValid(wentry->last_error_relid)) + values[i++] = ObjectIdGetDatum(wentry->last_error_relid); + else + nulls[i++] = true; + + /* last_error_command */ + if (wentry->last_error_command != 0) + values[i++] = + CStringGetTextDatum(logicalrep_message_type(wentry->last_error_command)); + else + nulls[i++] = true; + + /* last_error_xid */ + if (TransactionIdIsValid(wentry->last_error_xid)) + values[i++] = TransactionIdGetDatum(wentry->last_error_xid); + else + nulls[i++] = true; + + /* last_error_count */ + values[i++] = Int64GetDatum(wentry->last_error_count); + + /* last_error_message */ + values[i++] = CStringGetTextDatum(wentry->last_error_message); + + /* last_error_time */ + if (wentry->last_error_time != 0) + values[i++] = TimestampTzGetDatum(wentry->last_error_time); + else + nulls[i++] = true; + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} |
