data += xl_relfilenodes->nrels * sizeof(RelFileNode);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+ {
+ xl_xact_stats_items *xl_drops = (xl_xact_stats_items *) data;
+
+ parsed->nstats = xl_drops->nitems;
+ parsed->stats = xl_drops->items;
+
+ data += MinSizeOfXactStatsItems;
+ data += xl_drops->nitems * sizeof(xl_xact_stats_item);
+ }
+
if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
{
xl_xact_invals *xl_invals = (xl_xact_invals *) data;
data += xl_relfilenodes->nrels * sizeof(RelFileNode);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+ {
+ xl_xact_stats_items *xl_drops = (xl_xact_stats_items *) data;
+
+ parsed->nstats = xl_drops->nitems;
+ parsed->stats = xl_drops->items;
+
+ data += MinSizeOfXactStatsItems;
+ data += xl_drops->nitems * sizeof(xl_xact_stats_item);
+ }
+
if (parsed->xinfo & XACT_XINFO_HAS_TWOPHASE)
{
xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
parsed->abortnodes = (RelFileNode *) bufptr;
bufptr += MAXALIGN(xlrec->nabortrels * sizeof(RelFileNode));
+ parsed->stats = (xl_xact_stats_item *) bufptr;
+ bufptr += MAXALIGN(xlrec->ncommitstats * sizeof(xl_xact_stats_item));
+
+ parsed->abortstats = (xl_xact_stats_item *) bufptr;
+ bufptr += MAXALIGN(xlrec->nabortstats * sizeof(xl_xact_stats_item));
+
parsed->msgs = (SharedInvalidationMessage *) bufptr;
bufptr += MAXALIGN(xlrec->ninvalmsgs * sizeof(SharedInvalidationMessage));
}
}
}
+static void
+xact_desc_stats(StringInfo buf, const char *label,
+ int ndropped, xl_xact_stats_item *dropped_stats)
+{
+ int i;
+
+ if (ndropped > 0)
+ {
+ appendStringInfo(buf, "; %sdropped stats:", label);
+ for (i = 0; i < ndropped; i++)
+ {
+ appendStringInfo(buf, " %u/%u/%u",
+ dropped_stats[i].kind,
+ dropped_stats[i].dboid,
+ dropped_stats[i].objoid);
+ }
+ }
+}
+
static void
xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
{
xact_desc_relations(buf, "rels", parsed.nrels, parsed.xnodes);
xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
+ xact_desc_stats(buf, "", parsed.nstats, parsed.stats);
standby_desc_invalidations(buf, parsed.nmsgs, parsed.msgs, parsed.dbId,
parsed.tsId,
LSN_FORMAT_ARGS(parsed.origin_lsn),
timestamptz_to_str(parsed.origin_timestamp));
}
+
+ xact_desc_stats(buf, "", parsed.nstats, parsed.stats);
}
static void
xact_desc_relations(buf, "rels(commit)", parsed.nrels, parsed.xnodes);
xact_desc_relations(buf, "rels(abort)", parsed.nabortrels,
parsed.abortnodes);
+ xact_desc_stats(buf, "commit ", parsed.nstats, parsed.stats);
+ xact_desc_stats(buf, "abort ", parsed.nabortstats, parsed.abortstats);
xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
standby_desc_invalidations(buf, parsed.nmsgs, parsed.msgs, parsed.dbId,
TransactionId *children,
int nrels,
RelFileNode *rels,
+ int nstats,
+ xl_xact_stats_item *stats,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
bool initfileinval,
TransactionId *children,
int nrels,
RelFileNode *rels,
+ int nstats,
+ xl_xact_stats_item *stats,
const char *gid);
static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
TransactionId *children;
RelFileNode *commitrels;
RelFileNode *abortrels;
+ xl_xact_stats_item *abortstats = NULL;
+ xl_xact_stats_item *commitstats = NULL;
SharedInvalidationMessage *invalmsgs;
/* Initialize linked list */
hdr.nsubxacts = xactGetCommittedChildren(&children);
hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
+ hdr.ncommitstats =
+ pgstat_get_transactional_drops(true, &commitstats);
+ hdr.nabortstats =
+ pgstat_get_transactional_drops(false, &abortstats);
hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
&hdr.initfileinval);
hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode));
pfree(abortrels);
}
+ if (hdr.ncommitstats > 0)
+ {
+ save_state_data(commitstats,
+ hdr.ncommitstats * sizeof(xl_xact_stats_item));
+ pfree(commitstats);
+ }
+ if (hdr.nabortstats > 0)
+ {
+ save_state_data(abortstats,
+ hdr.nabortstats * sizeof(xl_xact_stats_item));
+ pfree(abortstats);
+ }
if (hdr.ninvalmsgs > 0)
{
save_state_data(invalmsgs,
RelFileNode *abortrels;
RelFileNode *delrels;
int ndelrels;
+ xl_xact_stats_item *commitstats;
+ xl_xact_stats_item *abortstats;
SharedInvalidationMessage *invalmsgs;
/*
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
abortrels = (RelFileNode *) bufptr;
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+ commitstats = (xl_xact_stats_item*) bufptr;
+ bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
+ abortstats = (xl_xact_stats_item*) bufptr;
+ bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
invalmsgs = (SharedInvalidationMessage *) bufptr;
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
RecordTransactionCommitPrepared(xid,
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
+ hdr->ncommitstats,
+ commitstats,
hdr->ninvalmsgs, invalmsgs,
hdr->initfileinval, gid);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
hdr->nabortrels, abortrels,
+ hdr->nabortstats,
+ abortstats,
gid);
ProcArrayRemove(proc, latestXid);
/* Make sure files supposed to be dropped are dropped */
DropRelationFiles(delrels, ndelrels, false);
+ if (isCommit)
+ pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
+ else
+ pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
+
/*
* Handle cache invalidation messages.
*
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+ bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
+ bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
/*
TransactionId *children,
int nrels,
RelFileNode *rels,
+ int nstats,
+ xl_xact_stats_item *stats,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
bool initfileinval,
*/
recptr = XactLogCommitRecord(committs,
nchildren, children, nrels, rels,
+ nstats, stats,
ninvalmsgs, invalmsgs,
initfileinval,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
TransactionId *children,
int nrels,
RelFileNode *rels,
+ int nstats,
+ xl_xact_stats_item *stats,
const char *gid)
{
XLogRecPtr recptr;
recptr = XactLogAbortRecord(GetCurrentTimestamp(),
nchildren, children,
nrels, rels,
+ nstats, stats,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
xid, gid);
RelFileNode *rels;
int nchildren;
TransactionId *children;
+ int ndroppedstats = 0;
+ xl_xact_stats_item *droppedstats = NULL;
int nmsgs = 0;
SharedInvalidationMessage *invalMessages = NULL;
bool RelcacheInitFileInval = false;
/* Get data needed for commit record */
nrels = smgrGetPendingDeletes(true, &rels);
nchildren = xactGetCommittedChildren(&children);
+ ndroppedstats = pgstat_get_transactional_drops(true, &droppedstats);
if (XLogStandbyInfoActive())
nmsgs = xactGetCommittedInvalidationMessages(&invalMessages,
&RelcacheInitFileInval);
/*
* We expect that every RelationDropStorage is followed by a catalog
* update, and hence XID assignment, so we shouldn't get here with any
- * pending deletes. Use a real test not just an Assert to check this,
- * since it's a bit fragile.
+ * pending deletes. Same is true for dropping stats.
+ *
+ * Use a real test not just an Assert to check this, since it's a bit
+ * fragile.
*/
- if (nrels != 0)
+ if (nrels != 0 || ndroppedstats != 0)
elog(ERROR, "cannot commit a transaction that deleted files but has no xid");
/* Can't have child XIDs either; AssignTransactionId enforces this */
XactLogCommitRecord(xactStopTimestamp,
nchildren, children, nrels, rels,
+ ndroppedstats, droppedstats,
nmsgs, invalMessages,
RelcacheInitFileInval,
MyXactFlags,
/* Clean up local data */
if (rels)
pfree(rels);
+ if (ndroppedstats)
+ pfree(droppedstats);
return latestXid;
}
TransactionId latestXid;
int nrels;
RelFileNode *rels;
+ int ndroppedstats = 0;
+ xl_xact_stats_item *droppedstats = NULL;
int nchildren;
TransactionId *children;
TimestampTz xact_time;
/* Fetch the data we need for the abort record */
nrels = smgrGetPendingDeletes(false, &rels);
nchildren = xactGetCommittedChildren(&children);
+ ndroppedstats = pgstat_get_transactional_drops(false, &droppedstats);
/* XXX do we really need a critical section here? */
START_CRIT_SECTION();
XactLogAbortRecord(xact_time,
nchildren, children,
nrels, rels,
+ ndroppedstats, droppedstats,
MyXactFlags, InvalidTransactionId,
NULL);
/* And clean up local data */
if (rels)
pfree(rels);
+ if (ndroppedstats)
+ pfree(droppedstats);
return latestXid;
}
XactLogCommitRecord(TimestampTz commit_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
+ int ndroppedstats, xl_xact_stats_item *droppedstats,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval,
int xactflags, TransactionId twophase_xid,
xl_xact_dbinfo xl_dbinfo;
xl_xact_subxacts xl_subxacts;
xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_stats_items xl_dropped_stats;
xl_xact_invals xl_invals;
xl_xact_twophase xl_twophase;
xl_xact_origin xl_origin;
info |= XLR_SPECIAL_REL_UPDATE;
}
+ if (ndroppedstats > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
+ xl_dropped_stats.nitems = ndroppedstats;
+ }
+
if (nmsgs > 0)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
nrels * sizeof(RelFileNode));
}
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+ {
+ XLogRegisterData((char *) (&xl_dropped_stats),
+ MinSizeOfXactStatsItems);
+ XLogRegisterData((char *) droppedstats,
+ ndroppedstats * sizeof(xl_xact_stats_item));
+ }
+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
{
XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
+ int ndroppedstats, xl_xact_stats_item *droppedstats,
int xactflags, TransactionId twophase_xid,
const char *twophase_gid)
{
xl_xact_xinfo xl_xinfo;
xl_xact_subxacts xl_subxacts;
xl_xact_relfilenodes xl_relfilenodes;
+ xl_xact_stats_items xl_dropped_stats;
xl_xact_twophase xl_twophase;
xl_xact_dbinfo xl_dbinfo;
xl_xact_origin xl_origin;
info |= XLR_SPECIAL_REL_UPDATE;
}
+ if (ndroppedstats > 0)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
+ xl_dropped_stats.nitems = ndroppedstats;
+ }
+
if (TransactionIdIsValid(twophase_xid))
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
nrels * sizeof(RelFileNode));
}
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+ {
+ XLogRegisterData((char *) (&xl_dropped_stats),
+ MinSizeOfXactStatsItems);
+ XLogRegisterData((char *) droppedstats,
+ ndroppedstats * sizeof(xl_xact_stats_item));
+ }
+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
{
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
}
+ if (parsed->nstats > 0)
+ {
+ /* see equivalent call for relations above */
+ XLogFlush(lsn);
+
+ pgstat_execute_transactional_drops(parsed->nstats, parsed->stats, true);
+ }
+
/*
* We issue an XLogFlush() for the same reason we emit ForceSyncCommit()
* in normal operation. For example, in CREATE DATABASE, we copy all files
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
}
+
+ if (parsed->nstats > 0)
+ {
+ /* see equivalent call for relations above */
+ XLogFlush(lsn);
+
+ pgstat_execute_transactional_drops(parsed->nstats, parsed->stats, true);
+ }
}
void
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "partitioning/partdesc.h"
+#include "pgstat.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "utils/builtins.h"
if (oncommit != ONCOMMIT_NOOP)
register_on_commit_action(relid, oncommit);
+ /* ensure that stats are dropped if transaction aborts */
+ pgstat_create_relation(new_rel_desc);
+
/*
* ok, the relation has been cataloged, so close our relations and return
* the OID of the newly created relation.
if (RELKIND_HAS_STORAGE(rel->rd_rel->relkind))
RelationDropStorage(rel);
+ /* ensure that stats are dropped if transaction commits */
+ pgstat_drop_relation(rel);
+
/*
* Close relcache entry, but *keep* AccessExclusiveLock on the relation
* until transaction commit. This ensures no one else will try to do
#include "parser/analyze.h"
#include "parser/parse_coerce.h"
#include "parser/parse_type.h"
+#include "pgstat.h"
#include "rewrite/rewriteHandler.h"
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
AtEOXact_GUC(true, save_nestlevel);
}
+ /* ensure that stats are dropped if transaction commits */
+ if (!is_update)
+ pgstat_create_function(retval);
+
return myself;
}
table_close(relation, RowExclusiveLock);
+ pgstat_drop_function(funcOid);
+
/*
* If there's a pg_aggregate tuple, delete that too.
*/
table_close(rel, RowExclusiveLock);
+ pgstat_create_subscription(subid);
+
if (opts.enabled)
ApplyLauncherWakeupAtCommit();
* gets lost.
*/
if (slotname)
- pgstat_report_subscription_drop(subid);
+ pgstat_drop_subscription(subid);
table_close(rel, NoLock);
}
CHECK_FOR_INTERRUPTS();
if (hash_search(htab, (void *) &(subentry->subid), HASH_FIND, NULL) == NULL)
- pgstat_report_subscription_drop(subentry->subid);
+ pgstat_drop_subscription(subentry->subid);
}
hash_destroy(htab);
smgrsw[which].smgr_close(rels[i], forknum);
}
- /*
- * It'd be nice to tell the stats collector to forget them immediately,
- * too. But we can't because we don't know the OIDs.
- */
-
/*
* Send a shared-inval message to force other backends to close any
* dangling smgr references they may have for these rels. We should do
static instr_time total_func_time;
+/*
+ * Ensure that stats are dropped if transaction aborts.
+ */
+void
+pgstat_create_function(Oid proid)
+{
+ pgstat_create_transactional(PGSTAT_KIND_FUNCTION,
+ MyDatabaseId,
+ proid);
+}
+
+/*
+ * Ensure that stats are dropped if transaction commits.
+ */
+void
+pgstat_drop_function(Oid proid)
+{
+ pgstat_drop_transactional(PGSTAT_KIND_FUNCTION,
+ MyDatabaseId,
+ proid);
+}
+
/*
* Initialize function call usage data.
* Called by the executor before invoking a function.
}
/*
- * Tell the collector that we just dropped a relation.
- * (If the message gets lost, we will still clean the dead entry eventually
- * via future invocations of pgstat_vacuum_stat().)
- *
- * Currently not used for lack of any good place to call it; we rely
- * entirely on pgstat_vacuum_stat() to clean out stats for dead rels.
+ * Ensure that stats are dropped if transaction aborts.
*/
-#ifdef NOT_USED
void
-pgstat_drop_relation(Oid relid)
+pgstat_create_relation(Relation rel)
{
- PgStat_MsgTabpurge msg;
- int len;
-
- if (pgStatSock == PGINVALID_SOCKET)
- return;
-
- msg.m_tableid[0] = relid;
- msg.m_nentries = 1;
-
- len = offsetof(PgStat_MsgTabpurge, m_tableid[0]) + sizeof(Oid);
+ pgstat_create_transactional(PGSTAT_KIND_RELATION,
+ rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId,
+ RelationGetRelid(rel));
+}
- pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_TABPURGE);
- msg.m_databaseid = MyDatabaseId;
- pgstat_send(&msg, len);
+/*
+ * Ensure that stats are dropped if transaction commits.
+ */
+void
+pgstat_drop_relation(Relation rel)
+{
+ pgstat_drop_transactional(PGSTAT_KIND_RELATION,
+ rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId,
+ RelationGetRelid(rel));
}
-#endif /* NOT_USED */
/*
* Report that the table was just vacuumed.
pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionError));
}
+/*
+ * Report creating the subscription.
+ *
+ * Ensures that stats are dropped if transaction rolls back.
+ */
+void
+pgstat_create_subscription(Oid subid)
+{
+ pgstat_create_transactional(PGSTAT_KIND_SUBSCRIPTION,
+ InvalidOid, subid);
+}
+
/*
* Report dropping the subscription.
+ *
+ * Ensures that stats are dropped if transaction commits.
*/
void
-pgstat_report_subscription_drop(Oid subid)
+pgstat_drop_subscription(Oid subid)
{
PgStat_MsgSubscriptionDrop msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONDROP);
msg.m_subid = subid;
pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionDrop));
+
+ pgstat_drop_transactional(PGSTAT_KIND_SUBSCRIPTION,
+ InvalidOid, subid);
}
#include "utils/pgstat_internal.h"
+typedef struct PgStat_PendingDroppedStatsItem
+{
+ xl_xact_stats_item item;
+ bool is_create;
+ dlist_node node;
+} PgStat_PendingDroppedStatsItem;
+
+
+static void AtEOXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state, bool isCommit);
+static void AtEOSubXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state,
+ bool isCommit, int nestDepth);
+
static PgStat_SubXactStatus *pgStatXactStack = NULL;
Assert(xact_state->prev == NULL);
AtEOXact_PgStat_Relations(xact_state, isCommit);
+ AtEOXact_PgStat_DroppedStats(xact_state, isCommit);
}
pgStatXactStack = NULL;
pgstat_clear_snapshot();
}
+/*
+ * When committing, drop stats for objects dropped in the transaction. When
+ * aborting, drop stats for objects created in the transaction.
+ */
+static void
+AtEOXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state, bool isCommit)
+{
+ dlist_mutable_iter iter;
+
+ if (xact_state->pending_drops_count == 0)
+ {
+ Assert(dlist_is_empty(&xact_state->pending_drops));
+ return;
+ }
+
+ dlist_foreach_modify(iter, &xact_state->pending_drops)
+ {
+ PgStat_PendingDroppedStatsItem *pending =
+ dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
+
+ if (isCommit && !pending->is_create)
+ {
+ /*
+ * Transaction that dropped an object committed. Drop the stats
+ * too.
+ */
+ /* will do work in subsequent commit */
+ }
+ else if (!isCommit && pending->is_create)
+ {
+ /*
+ * Transaction that created an object aborted. Drop the stats
+ * associated with the object.
+ */
+ /* will do work in subsequent commit */
+ }
+
+ dlist_delete(&pending->node);
+ xact_state->pending_drops_count--;
+ pfree(pending);
+ }
+}
+
/*
* Called from access/transam/xact.c at subtransaction commit/abort.
*/
pgStatXactStack = xact_state->prev;
AtEOSubXact_PgStat_Relations(xact_state, isCommit, nestDepth);
+ AtEOSubXact_PgStat_DroppedStats(xact_state, isCommit, nestDepth);
pfree(xact_state);
}
}
+/*
+ * Like AtEOXact_PgStat_DroppedStats(), but for subtransactions.
+ */
+static void
+AtEOSubXact_PgStat_DroppedStats(PgStat_SubXactStatus *xact_state,
+ bool isCommit, int nestDepth)
+{
+ PgStat_SubXactStatus *parent_xact_state;
+ dlist_mutable_iter iter;
+
+ if (xact_state->pending_drops_count == 0)
+ return;
+
+ parent_xact_state = pgstat_xact_stack_level_get(nestDepth - 1);
+
+ dlist_foreach_modify(iter, &xact_state->pending_drops)
+ {
+ PgStat_PendingDroppedStatsItem *pending =
+ dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
+
+ dlist_delete(&pending->node);
+ xact_state->pending_drops_count--;
+
+ if (!isCommit && pending->is_create)
+ {
+ /*
+ * Subtransaction creating a new stats object aborted. Drop the
+ * stats object.
+ */
+ /* will do work in subsequent commit */
+ pfree(pending);
+ }
+ else if (isCommit)
+ {
+ /*
+ * Subtransaction dropping a stats object committed. Can't yet
+ * remove the stats object, the surrounding transaction might
+ * still abort. Pass it on to the parent.
+ */
+ dlist_push_tail(&parent_xact_state->pending_drops, &pending->node);
+ parent_xact_state->pending_drops_count++;
+ }
+ else
+ {
+ pfree(pending);
+ }
+ }
+
+ Assert(xact_state->pending_drops_count == 0);
+}
+
/*
* Save the transactional stats state at 2PC transaction prepare.
*/
xact_state = (PgStat_SubXactStatus *)
MemoryContextAlloc(TopTransactionContext,
sizeof(PgStat_SubXactStatus));
+ dlist_init(&xact_state->pending_drops);
+ xact_state->pending_drops_count = 0;
xact_state->nest_level = nest_level;
xact_state->prev = pgStatXactStack;
xact_state->first = NULL;
}
return xact_state;
}
+
+/*
+ * Get stat items that need to be dropped at commit / abort.
+ *
+ * When committing, stats for objects that have been dropped in the
+ * transaction are returned. When aborting, stats for newly created objects are
+ * returned.
+ *
+ * Used by COMMIT / ABORT and 2PC PREPARE processing when building their
+ * respective WAL records, to ensure stats are dropped in case of a crash / on
+ * standbys.
+ *
+ * The list of items is allocated in CurrentMemoryContext and must be freed by
+ * the caller (directly or via memory context reset).
+ */
+int
+pgstat_get_transactional_drops(bool isCommit, xl_xact_stats_item **items)
+{
+ PgStat_SubXactStatus *xact_state = pgStatXactStack;
+ int nitems = 0;
+ dlist_iter iter;
+
+ if (xact_state == NULL)
+ return 0;
+
+ /*
+ * We expect to be called for subtransaction abort (which logs a WAL
+ * record), but not for subtransaction commit (which doesn't).
+ */
+ Assert(!isCommit || xact_state->nest_level == 1);
+ Assert(!isCommit || xact_state->prev == NULL);
+
+ *items = palloc(xact_state->pending_drops_count
+ * sizeof(xl_xact_stats_item));
+
+ dlist_foreach(iter, &xact_state->pending_drops)
+ {
+ PgStat_PendingDroppedStatsItem *pending =
+ dlist_container(PgStat_PendingDroppedStatsItem, node, iter.cur);
+
+ if (isCommit && pending->is_create)
+ continue;
+ if (!isCommit && !pending->is_create)
+ continue;
+
+ Assert(nitems < xact_state->pending_drops_count);
+ (*items)[nitems++] = pending->item;
+ }
+
+ return nitems;
+}
+
+/*
+ * Execute scheduled drops post-commit. Called from xact_redo_commit() /
+ * xact_redo_abort() during recovery, and from FinishPreparedTransaction()
+ * during normal 2PC COMMIT/ABORT PREPARED processing.
+ */
+void
+pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items, bool is_redo)
+{
+ if (ndrops == 0)
+ return;
+
+ for (int i = 0; i < ndrops; i++)
+ {
+ /* will do work in subsequent commit */
+ }
+}
+
+static void
+create_drop_transactional_internal(PgStat_Kind kind, Oid dboid, Oid objoid, bool is_create)
+{
+ int nest_level = GetCurrentTransactionNestLevel();
+ PgStat_SubXactStatus *xact_state;
+ PgStat_PendingDroppedStatsItem *drop = (PgStat_PendingDroppedStatsItem *)
+ MemoryContextAlloc(TopTransactionContext, sizeof(PgStat_PendingDroppedStatsItem));
+
+ xact_state = pgstat_xact_stack_level_get(nest_level);
+
+ drop->is_create = is_create;
+ drop->item.kind = kind;
+ drop->item.dboid = dboid;
+ drop->item.objoid = objoid;
+
+ dlist_push_tail(&xact_state->pending_drops, &drop->node);
+ xact_state->pending_drops_count++;
+}
+
+/*
+ * Create a stats entry for a newly created database object in a transactional
+ * manner.
+ *
+ * I.e. if the current (sub-)transaction aborts, the stats entry will also be
+ * dropped.
+ */
+void
+pgstat_create_transactional(PgStat_Kind kind, Oid dboid, Oid objoid)
+{
+ create_drop_transactional_internal(kind, dboid, objoid, /* create */ true);
+}
+
+/*
+ * Drop a stats entry for a just dropped database object in a transactional
+ * manner.
+ *
+ * I.e. if the current (sub-)transaction aborts, the stats entry will stay
+ * alive.
+ */
+void
+pgstat_drop_transactional(PgStat_Kind kind, Oid dboid, Oid objoid)
+{
+ create_drop_transactional_internal(kind, dboid, objoid, /* create */ false);
+}
#define XACT_XINFO_HAS_ORIGIN (1U << 5)
#define XACT_XINFO_HAS_AE_LOCKS (1U << 6)
#define XACT_XINFO_HAS_GID (1U << 7)
+#define XACT_XINFO_HAS_DROPPED_STATS (1U << 8)
/*
* Also stored in xinfo, these indicating a variety of additional actions that
typedef struct xl_xact_xinfo
{
/*
- * Even though we right now only require 1 byte of space in xinfo we use
+ * Even though we right now only require two bytes of space in xinfo we use
* four so following records don't have to care about alignment. Commit
* records can be large, so copying large portions isn't attractive.
*/
} xl_xact_relfilenodes;
#define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes)
+/*
+ * A transactionally dropped statistics entry.
+ *
+ * Declared here rather than pgstat.h because pgstat.h can't be included from
+ * frontend code, but the WAL format needs to be readable by frontend
+ * programs.
+ */
+typedef struct xl_xact_stats_item
+{
+ int kind;
+ Oid dboid;
+ Oid objoid;
+} xl_xact_stats_item;
+
+typedef struct xl_xact_stats_items
+{
+ int nitems;
+ xl_xact_stats_item items[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_stats_items;
+#define MinSizeOfXactStatsItems offsetof(xl_xact_stats_items, items)
+
typedef struct xl_xact_invals
{
int nmsgs; /* number of shared inval msgs */
/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
+ /* xl_xact_stats_items follows if XINFO_HAS_DROPPED_STATS */
/* xl_xact_invals follows if XINFO_HAS_INVALS */
/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT */
/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
+ /* xl_xact_stats_items follows if XINFO_HAS_DROPPED_STATS */
/* No invalidation messages needed. */
/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
/* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */
int32 nsubxacts; /* number of following subxact XIDs */
int32 ncommitrels; /* number of delete-on-commit rels */
int32 nabortrels; /* number of delete-on-abort rels */
+ int32 ncommitstats; /* number of stats to drop on commit */
+ int32 nabortstats; /* number of stats to drop on abort */
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
uint16 gidlen; /* length of the GID - GID follows the header */
int nrels;
RelFileNode *xnodes;
+ int nstats;
+ xl_xact_stats_item *stats;
+
int nmsgs;
SharedInvalidationMessage *msgs;
char twophase_gid[GIDSIZE]; /* only for 2PC */
int nabortrels; /* only for 2PC */
RelFileNode *abortnodes; /* only for 2PC */
+ int nabortstats; /* only for 2PC */
+ xl_xact_stats_item *abortstats; /* only for 2PC */
XLogRecPtr origin_lsn;
TimestampTz origin_timestamp;
int nrels;
RelFileNode *xnodes;
+ int nstats;
+ xl_xact_stats_item *stats;
+
TransactionId twophase_xid; /* only for 2PC */
char twophase_gid[GIDSIZE]; /* only for 2PC */
extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
+ int nstats,
+ xl_xact_stats_item *stats,
int nmsgs, SharedInvalidationMessage *msgs,
bool relcacheInval,
int xactflags,
extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
int nsubxacts, TransactionId *subxacts,
int nrels, RelFileNode *rels,
+ int nstats,
+ xl_xact_stats_item *stats,
int xactflags, TransactionId twophase_xid,
const char *twophase_gid);
extern void xact_redo(XLogReaderState *record);
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD10F /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD110 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
* Functions in pgstat_function.c
*/
+extern void pgstat_create_function(Oid proid);
+extern void pgstat_drop_function(Oid proid);
+
struct FunctionCallInfoBaseData;
extern void pgstat_init_function_usage(struct FunctionCallInfoBaseData *fcinfo,
PgStat_FunctionCallUsage *fcu);
* Functions in pgstat_relation.c
*/
+extern void pgstat_create_relation(Relation rel);
+extern void pgstat_drop_relation(Relation rel);
extern void pgstat_copy_relation_stats(Relation dstrel, Relation srcrel);
extern void pgstat_relation_init(Relation rel);
*/
extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
-extern void pgstat_report_subscription_drop(Oid subid);
+extern void pgstat_create_subscription(Oid subid);
+extern void pgstat_drop_subscription(Oid subid);
/*
extern void AtEOSubXact_PgStat(bool isCommit, int nestDepth);
extern void AtPrepare_PgStat(void);
extern void PostPrepare_PgStat(void);
+struct xl_xact_stats_item;
+extern int pgstat_get_transactional_drops(bool isCommit, struct xl_xact_stats_item **items);
+extern void pgstat_execute_transactional_drops(int ndrops, struct xl_xact_stats_item *items, bool is_redo);
/*
struct PgStat_SubXactStatus *prev; /* higher-level subxact if any */
+ /*
+ * Dropping the statistics for objects that dropped transactionally itself
+ * needs to be transactional. Therefore we collect the stats dropped in
+ * the current (sub-)transaction and only execute the stats drop when we
+ * know if the transaction commits/aborts. To handle replicas and crashes,
+ * stats drops are included in commit records.
+ */
+ dlist_head pending_drops;
+ int pending_drops_count;
+
/*
* Tuple insertion/deletion counts for an open transaction can't be
* propagated into PgStat_TableStatus counters until we know if it is
*/
extern PgStat_SubXactStatus *pgstat_xact_stack_level_get(int nest_level);
+extern void pgstat_drop_transactional(PgStat_Kind kind, Oid dboid, Oid objoid);
+extern void pgstat_create_transactional(PgStat_Kind kind, Oid dboid, Oid objoid);
+
/*
PgStat_MsgTempFile
PgStat_MsgVacuum
PgStat_MsgWal
+PgStat_PendingDroppedStatsItem
PgStat_SLRUStats
PgStat_StatDBEntry
PgStat_StatFuncEntry
xl_xact_parsed_prepare
xl_xact_prepare
xl_xact_relfilenodes
+xl_xact_stats_item
+xl_xact_stats_items
xl_xact_subxacts
xl_xact_twophase
xl_xact_xinfo