LCOV - code coverage report
Current view: top level - src/backend/access/transam - commit_ts.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 225 268 84.0 %
Date: 2025-07-09 20:18:08 Functions: 26 29 89.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * commit_ts.c
       4             :  *      PostgreSQL commit timestamp manager
       5             :  *
       6             :  * This module is a pg_xact-like system that stores the commit timestamp
       7             :  * for each transaction.
       8             :  *
       9             :  * XLOG interactions: this module generates an XLOG record whenever a new
      10             :  * CommitTs page is initialized to zeroes.  Other writes of CommitTS come
      11             :  * from recording of transaction commit in xact.c, which generates its own
      12             :  * XLOG records for these events and will re-perform the status update on
      13             :  * redo; so we need make no additional XLOG entry here.
      14             :  *
      15             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      16             :  * Portions Copyright (c) 1994, Regents of the University of California
      17             :  *
      18             :  * src/backend/access/transam/commit_ts.c
      19             :  *
      20             :  *-------------------------------------------------------------------------
      21             :  */
      22             : #include "postgres.h"
      23             : 
      24             : #include "access/commit_ts.h"
      25             : #include "access/htup_details.h"
      26             : #include "access/slru.h"
      27             : #include "access/transam.h"
      28             : #include "access/xloginsert.h"
      29             : #include "access/xlogutils.h"
      30             : #include "funcapi.h"
      31             : #include "miscadmin.h"
      32             : #include "storage/shmem.h"
      33             : #include "utils/fmgrprotos.h"
      34             : #include "utils/guc_hooks.h"
      35             : #include "utils/timestamp.h"
      36             : 
      37             : /*
      38             :  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
      39             :  * everywhere else in Postgres.
      40             :  *
      41             :  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
      42             :  * CommitTs page numbering also wraps around at
      43             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
      44             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
      45             :  * explicit notice of that fact in this module, except when comparing segment
      46             :  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
      47             :  */
      48             : 
      49             : /*
      50             :  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
      51             :  * the largest possible file name is more than 5 chars long; see
      52             :  * SlruScanDirectory.
      53             :  */
      54             : typedef struct CommitTimestampEntry
      55             : {
      56             :     TimestampTz time;
      57             :     RepOriginId nodeid;
      58             : } CommitTimestampEntry;
      59             : 
      60             : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
      61             :                                     sizeof(RepOriginId))
      62             : 
      63             : #define COMMIT_TS_XACTS_PER_PAGE \
      64             :     (BLCKSZ / SizeOfCommitTimestampEntry)
      65             : 
      66             : 
      67             : /*
      68             :  * Although we return an int64 the actual value can't currently exceed
      69             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
      70             :  */
      71             : static inline int64
      72        4224 : TransactionIdToCTsPage(TransactionId xid)
      73             : {
      74        4224 :     return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
      75             : }
      76             : 
      77             : #define TransactionIdToCTsEntry(xid)    \
      78             :     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
      79             : 
      80             : /*
      81             :  * Link to shared-memory data structures for CommitTs control
      82             :  */
      83             : static SlruCtlData CommitTsCtlData;
      84             : 
      85             : #define CommitTsCtl (&CommitTsCtlData)
      86             : 
      87             : /*
      88             :  * We keep a cache of the last value set in shared memory.
      89             :  *
      90             :  * This is also good place to keep the activation status.  We keep this
      91             :  * separate from the GUC so that the standby can activate the module if the
      92             :  * primary has it active independently of the value of the GUC.
      93             :  *
      94             :  * This is protected by CommitTsLock.  In some places, we use commitTsActive
      95             :  * without acquiring the lock; where this happens, a comment explains the
      96             :  * rationale for it.
      97             :  */
      98             : typedef struct CommitTimestampShared
      99             : {
     100             :     TransactionId xidLastCommit;
     101             :     CommitTimestampEntry dataLastCommit;
     102             :     bool        commitTsActive;
     103             : } CommitTimestampShared;
     104             : 
     105             : static CommitTimestampShared *commitTsShared;
     106             : 
     107             : 
     108             : /* GUC variable */
     109             : bool        track_commit_timestamp;
     110             : 
     111             : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     112             :                                  TransactionId *subxids, TimestampTz ts,
     113             :                                  RepOriginId nodeid, int64 pageno);
     114             : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     115             :                                      RepOriginId nodeid, int slotno);
     116             : static void error_commit_ts_disabled(void);
     117             : static bool CommitTsPagePrecedes(int64 page1, int64 page2);
     118             : static void ActivateCommitTs(void);
     119             : static void DeactivateCommitTs(void);
     120             : static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
     121             : 
     122             : /*
     123             :  * TransactionTreeSetCommitTsData
     124             :  *
     125             :  * Record the final commit timestamp of transaction entries in the commit log
     126             :  * for a transaction and its subtransaction tree, as efficiently as possible.
     127             :  *
     128             :  * xid is the top level transaction id.
     129             :  *
     130             :  * subxids is an array of xids of length nsubxids, representing subtransactions
     131             :  * in the tree of xid. In various cases nsubxids may be zero.
     132             :  * The reason why tracking just the parent xid commit timestamp is not enough
     133             :  * is that the subtrans SLRU does not stay valid across crashes (it's not
     134             :  * permanent) so we need to keep the information about them here. If the
     135             :  * subtrans implementation changes in the future, we might want to revisit the
     136             :  * decision of storing timestamp info for each subxid.
     137             :  */
     138             : void
     139      297212 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
     140             :                                TransactionId *subxids, TimestampTz timestamp,
     141             :                                RepOriginId nodeid)
     142             : {
     143             :     int         i;
     144             :     TransactionId headxid;
     145             :     TransactionId newestXact;
     146             : 
     147             :     /*
     148             :      * No-op if the module is not active.
     149             :      *
     150             :      * An unlocked read here is fine, because in a standby (the only place
     151             :      * where the flag can change in flight) this routine is only called by the
     152             :      * recovery process, which is also the only process which can change the
     153             :      * flag.
     154             :      */
     155      297212 :     if (!commitTsShared->commitTsActive)
     156      295508 :         return;
     157             : 
     158             :     /*
     159             :      * Figure out the latest Xid in this batch: either the last subxid if
     160             :      * there's any, otherwise the parent xid.
     161             :      */
     162        1704 :     if (nsubxids > 0)
     163           0 :         newestXact = subxids[nsubxids - 1];
     164             :     else
     165        1704 :         newestXact = xid;
     166             : 
     167             :     /*
     168             :      * We split the xids to set the timestamp to in groups belonging to the
     169             :      * same SLRU page; the first element in each such set is its head.  The
     170             :      * first group has the main XID as the head; subsequent sets use the first
     171             :      * subxid not on the previous page as head.  This way, we only have to
     172             :      * lock/modify each SLRU page once.
     173             :      */
     174        1704 :     headxid = xid;
     175        1704 :     i = 0;
     176             :     for (;;)
     177           0 :     {
     178        1704 :         int64       pageno = TransactionIdToCTsPage(headxid);
     179             :         int         j;
     180             : 
     181        1704 :         for (j = i; j < nsubxids; j++)
     182             :         {
     183           0 :             if (TransactionIdToCTsPage(subxids[j]) != pageno)
     184           0 :                 break;
     185             :         }
     186             :         /* subxids[i..j] are on the same page as the head */
     187             : 
     188        1704 :         SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
     189             :                              pageno);
     190             : 
     191             :         /* if we wrote out all subxids, we're done. */
     192        1704 :         if (j >= nsubxids)
     193        1704 :             break;
     194             : 
     195             :         /*
     196             :          * Set the new head and skip over it, as well as over the subxids we
     197             :          * just wrote.
     198             :          */
     199           0 :         headxid = subxids[j];
     200           0 :         i = j + 1;
     201             :     }
     202             : 
     203             :     /* update the cached value in shared memory */
     204        1704 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     205        1704 :     commitTsShared->xidLastCommit = xid;
     206        1704 :     commitTsShared->dataLastCommit.time = timestamp;
     207        1704 :     commitTsShared->dataLastCommit.nodeid = nodeid;
     208             : 
     209             :     /* and move forwards our endpoint, if needed */
     210        1704 :     if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
     211        1682 :         TransamVariables->newestCommitTsXid = newestXact;
     212        1704 :     LWLockRelease(CommitTsLock);
     213             : }
     214             : 
     215             : /*
     216             :  * Record the commit timestamp of transaction entries in the commit log for all
     217             :  * entries on a single page.  Atomic only on this page.
     218             :  */
     219             : static void
     220        1704 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     221             :                      TransactionId *subxids, TimestampTz ts,
     222             :                      RepOriginId nodeid, int64 pageno)
     223             : {
     224        1704 :     LWLock     *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     225             :     int         slotno;
     226             :     int         i;
     227             : 
     228        1704 :     LWLockAcquire(lock, LW_EXCLUSIVE);
     229             : 
     230        1704 :     slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
     231             : 
     232        1704 :     TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
     233        1704 :     for (i = 0; i < nsubxids; i++)
     234           0 :         TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
     235             : 
     236        1704 :     CommitTsCtl->shared->page_dirty[slotno] = true;
     237             : 
     238        1704 :     LWLockRelease(lock);
     239        1704 : }
     240             : 
     241             : /*
     242             :  * Sets the commit timestamp of a single transaction.
     243             :  *
     244             :  * Caller must hold the correct SLRU bank lock, will be held at exit
     245             :  */
     246             : static void
     247        1704 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     248             :                          RepOriginId nodeid, int slotno)
     249             : {
     250        1704 :     int         entryno = TransactionIdToCTsEntry(xid);
     251             :     CommitTimestampEntry entry;
     252             : 
     253             :     Assert(TransactionIdIsNormal(xid));
     254             : 
     255        1704 :     entry.time = ts;
     256        1704 :     entry.nodeid = nodeid;
     257             : 
     258        1704 :     memcpy(CommitTsCtl->shared->page_buffer[slotno] +
     259        1704 :            SizeOfCommitTimestampEntry * entryno,
     260             :            &entry, SizeOfCommitTimestampEntry);
     261        1704 : }
     262             : 
     263             : /*
     264             :  * Interrogate the commit timestamp of a transaction.
     265             :  *
     266             :  * The return value indicates whether a commit timestamp record was found for
     267             :  * the given xid.  The timestamp value is returned in *ts (which may not be
     268             :  * null), and the origin node for the Xid is returned in *nodeid, if it's not
     269             :  * null.
     270             :  */
     271             : bool
     272          78 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
     273             :                              RepOriginId *nodeid)
     274             : {
     275          78 :     int64       pageno = TransactionIdToCTsPage(xid);
     276          78 :     int         entryno = TransactionIdToCTsEntry(xid);
     277             :     int         slotno;
     278             :     CommitTimestampEntry entry;
     279             :     TransactionId oldestCommitTsXid;
     280             :     TransactionId newestCommitTsXid;
     281             : 
     282          78 :     if (!TransactionIdIsValid(xid))
     283           6 :         ereport(ERROR,
     284             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     285             :                  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
     286          72 :     else if (!TransactionIdIsNormal(xid))
     287             :     {
     288             :         /* frozen and bootstrap xids are always committed far in the past */
     289          12 :         *ts = 0;
     290          12 :         if (nodeid)
     291           4 :             *nodeid = 0;
     292          12 :         return false;
     293             :     }
     294             : 
     295          60 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     296             : 
     297             :     /* Error if module not enabled */
     298          60 :     if (!commitTsShared->commitTsActive)
     299           4 :         error_commit_ts_disabled();
     300             : 
     301             :     /*
     302             :      * If we're asked for the cached value, return that.  Otherwise, fall
     303             :      * through to read from SLRU.
     304             :      */
     305          56 :     if (commitTsShared->xidLastCommit == xid)
     306             :     {
     307          28 :         *ts = commitTsShared->dataLastCommit.time;
     308          28 :         if (nodeid)
     309          14 :             *nodeid = commitTsShared->dataLastCommit.nodeid;
     310             : 
     311          28 :         LWLockRelease(CommitTsLock);
     312          28 :         return *ts != 0;
     313             :     }
     314             : 
     315          28 :     oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
     316          28 :     newestCommitTsXid = TransamVariables->newestCommitTsXid;
     317             :     /* neither is invalid, or both are */
     318             :     Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
     319          28 :     LWLockRelease(CommitTsLock);
     320             : 
     321             :     /*
     322             :      * Return empty if the requested value is outside our valid range.
     323             :      */
     324          56 :     if (!TransactionIdIsValid(oldestCommitTsXid) ||
     325          50 :         TransactionIdPrecedes(xid, oldestCommitTsXid) ||
     326          22 :         TransactionIdPrecedes(newestCommitTsXid, xid))
     327             :     {
     328           6 :         *ts = 0;
     329           6 :         if (nodeid)
     330           0 :             *nodeid = InvalidRepOriginId;
     331           6 :         return false;
     332             :     }
     333             : 
     334             :     /* lock is acquired by SimpleLruReadPage_ReadOnly */
     335          22 :     slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
     336          22 :     memcpy(&entry,
     337          22 :            CommitTsCtl->shared->page_buffer[slotno] +
     338          22 :            SizeOfCommitTimestampEntry * entryno,
     339             :            SizeOfCommitTimestampEntry);
     340             : 
     341          22 :     *ts = entry.time;
     342          22 :     if (nodeid)
     343           8 :         *nodeid = entry.nodeid;
     344             : 
     345          22 :     LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
     346          22 :     return *ts != 0;
     347             : }
     348             : 
     349             : /*
     350             :  * Return the Xid of the latest committed transaction.  (As far as this module
     351             :  * is concerned, anyway; it's up to the caller to ensure the value is useful
     352             :  * for its purposes.)
     353             :  *
     354             :  * ts and nodeid are filled with the corresponding data; they can be passed
     355             :  * as NULL if not wanted.
     356             :  */
     357             : TransactionId
     358           6 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
     359             : {
     360             :     TransactionId xid;
     361             : 
     362           6 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     363             : 
     364             :     /* Error if module not enabled */
     365           6 :     if (!commitTsShared->commitTsActive)
     366           0 :         error_commit_ts_disabled();
     367             : 
     368           6 :     xid = commitTsShared->xidLastCommit;
     369           6 :     if (ts)
     370           6 :         *ts = commitTsShared->dataLastCommit.time;
     371           6 :     if (nodeid)
     372           6 :         *nodeid = commitTsShared->dataLastCommit.nodeid;
     373           6 :     LWLockRelease(CommitTsLock);
     374             : 
     375           6 :     return xid;
     376             : }
     377             : 
     378             : static void
     379           4 : error_commit_ts_disabled(void)
     380             : {
     381           4 :     ereport(ERROR,
     382             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     383             :              errmsg("could not get commit timestamp data"),
     384             :              RecoveryInProgress() ?
     385             :              errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
     386             :                      "track_commit_timestamp") :
     387             :              errhint("Make sure the configuration parameter \"%s\" is set.",
     388             :                      "track_commit_timestamp")));
     389             : }
     390             : 
     391             : /*
     392             :  * SQL-callable wrapper to obtain commit time of a transaction
     393             :  */
     394             : Datum
     395          50 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
     396             : {
     397          50 :     TransactionId xid = PG_GETARG_TRANSACTIONID(0);
     398             :     TimestampTz ts;
     399             :     bool        found;
     400             : 
     401          50 :     found = TransactionIdGetCommitTsData(xid, &ts, NULL);
     402             : 
     403          42 :     if (!found)
     404          14 :         PG_RETURN_NULL();
     405             : 
     406          28 :     PG_RETURN_TIMESTAMPTZ(ts);
     407             : }
     408             : 
     409             : 
     410             : /*
     411             :  * pg_last_committed_xact
     412             :  *
     413             :  * SQL-callable wrapper to obtain some information about the latest
     414             :  * committed transaction: transaction ID, timestamp and replication
     415             :  * origin.
     416             :  */
     417             : Datum
     418           6 : pg_last_committed_xact(PG_FUNCTION_ARGS)
     419             : {
     420             :     TransactionId xid;
     421             :     RepOriginId nodeid;
     422             :     TimestampTz ts;
     423             :     Datum       values[3];
     424             :     bool        nulls[3];
     425             :     TupleDesc   tupdesc;
     426             :     HeapTuple   htup;
     427             : 
     428             :     /* and construct a tuple with our data */
     429           6 :     xid = GetLatestCommitTsData(&ts, &nodeid);
     430             : 
     431           6 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     432           0 :         elog(ERROR, "return type must be a row type");
     433             : 
     434           6 :     if (!TransactionIdIsNormal(xid))
     435             :     {
     436           0 :         memset(nulls, true, sizeof(nulls));
     437             :     }
     438             :     else
     439             :     {
     440           6 :         values[0] = TransactionIdGetDatum(xid);
     441           6 :         nulls[0] = false;
     442             : 
     443           6 :         values[1] = TimestampTzGetDatum(ts);
     444           6 :         nulls[1] = false;
     445             : 
     446           6 :         values[2] = ObjectIdGetDatum((Oid) nodeid);
     447           6 :         nulls[2] = false;
     448             :     }
     449             : 
     450           6 :     htup = heap_form_tuple(tupdesc, values, nulls);
     451             : 
     452           6 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     453             : }
     454             : 
     455             : /*
     456             :  * pg_xact_commit_timestamp_origin
     457             :  *
     458             :  * SQL-callable wrapper to obtain commit timestamp and replication origin
     459             :  * of a given transaction.
     460             :  */
     461             : Datum
     462          10 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
     463             : {
     464          10 :     TransactionId xid = PG_GETARG_TRANSACTIONID(0);
     465             :     RepOriginId nodeid;
     466             :     TimestampTz ts;
     467             :     Datum       values[2];
     468             :     bool        nulls[2];
     469             :     TupleDesc   tupdesc;
     470             :     HeapTuple   htup;
     471             :     bool        found;
     472             : 
     473          10 :     found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
     474             : 
     475           8 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     476           0 :         elog(ERROR, "return type must be a row type");
     477             : 
     478           8 :     if (!found)
     479             :     {
     480           4 :         memset(nulls, true, sizeof(nulls));
     481             :     }
     482             :     else
     483             :     {
     484           4 :         values[0] = TimestampTzGetDatum(ts);
     485           4 :         nulls[0] = false;
     486             : 
     487           4 :         values[1] = ObjectIdGetDatum((Oid) nodeid);
     488           4 :         nulls[1] = false;
     489             :     }
     490             : 
     491           8 :     htup = heap_form_tuple(tupdesc, values, nulls);
     492             : 
     493           8 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     494             : }
     495             : 
     496             : /*
     497             :  * Number of shared CommitTS buffers.
     498             :  *
     499             :  * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
     500             :  * Otherwise just cap the configured amount to be between 16 and the maximum
     501             :  * allowed.
     502             :  */
     503             : static int
     504        8220 : CommitTsShmemBuffers(void)
     505             : {
     506             :     /* auto-tune based on shared buffers */
     507        8220 :     if (commit_timestamp_buffers == 0)
     508        6076 :         return SimpleLruAutotuneBuffers(512, 1024);
     509             : 
     510        2144 :     return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
     511             : }
     512             : 
     513             : /*
     514             :  * Shared memory sizing for CommitTs
     515             :  */
     516             : Size
     517        3962 : CommitTsShmemSize(void)
     518             : {
     519        3962 :     return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
     520             :         sizeof(CommitTimestampShared);
     521             : }
     522             : 
     523             : /*
     524             :  * Initialize CommitTs at system startup (postmaster start or standalone
     525             :  * backend)
     526             :  */
     527             : void
     528        2134 : CommitTsShmemInit(void)
     529             : {
     530             :     bool        found;
     531             : 
     532             :     /* If auto-tuning is requested, now is the time to do it */
     533        2134 :     if (commit_timestamp_buffers == 0)
     534             :     {
     535             :         char        buf[32];
     536             : 
     537        2124 :         snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
     538        2124 :         SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
     539             :                         PGC_S_DYNAMIC_DEFAULT);
     540             : 
     541             :         /*
     542             :          * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
     543             :          * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
     544             :          * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
     545             :          * that and we must force the matter with PGC_S_OVERRIDE.
     546             :          */
     547        2124 :         if (commit_timestamp_buffers == 0)  /* failed to apply it? */
     548           0 :             SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
     549             :                             PGC_S_OVERRIDE);
     550             :     }
     551             :     Assert(commit_timestamp_buffers != 0);
     552             : 
     553        2134 :     CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
     554        2134 :     SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
     555             :                   "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
     556             :                   LWTRANCHE_COMMITTS_SLRU,
     557             :                   SYNC_HANDLER_COMMIT_TS,
     558             :                   false);
     559             :     SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
     560             : 
     561        2134 :     commitTsShared = ShmemInitStruct("CommitTs shared",
     562             :                                      sizeof(CommitTimestampShared),
     563             :                                      &found);
     564             : 
     565        2134 :     if (!IsUnderPostmaster)
     566             :     {
     567             :         Assert(!found);
     568             : 
     569        2134 :         commitTsShared->xidLastCommit = InvalidTransactionId;
     570        2134 :         TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     571        2134 :         commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
     572        2134 :         commitTsShared->commitTsActive = false;
     573             :     }
     574             :     else
     575             :         Assert(found);
     576        2134 : }
     577             : 
     578             : /*
     579             :  * GUC check_hook for commit_timestamp_buffers
     580             :  */
     581             : bool
     582        4334 : check_commit_ts_buffers(int *newval, void **extra, GucSource source)
     583             : {
     584        4334 :     return check_slru_buffers("commit_timestamp_buffers", newval);
     585             : }
     586             : 
     587             : /*
     588             :  * This function must be called ONCE on system install.
     589             :  *
     590             :  * (The CommitTs directory is assumed to have been created by initdb, and
     591             :  * CommitTsShmemInit must have been called already.)
     592             :  */
     593             : void
     594         102 : BootStrapCommitTs(void)
     595             : {
     596             :     /*
     597             :      * Nothing to do here at present, unlike most other SLRU modules; segments
     598             :      * are created when the server is started with this module enabled. See
     599             :      * ActivateCommitTs.
     600             :      */
     601         102 : }
     602             : 
     603             : /*
     604             :  * This must be called ONCE during postmaster or standalone-backend startup,
     605             :  * after StartupXLOG has initialized TransamVariables->nextXid.
     606             :  */
     607             : void
     608          26 : StartupCommitTs(void)
     609             : {
     610          26 :     ActivateCommitTs();
     611          26 : }
     612             : 
     613             : /*
     614             :  * This must be called ONCE during postmaster or standalone-backend startup,
     615             :  * after recovery has finished.
     616             :  */
     617             : void
     618        1728 : CompleteCommitTsInitialization(void)
     619             : {
     620             :     /*
     621             :      * If the feature is not enabled, turn it off for good.  This also removes
     622             :      * any leftover data.
     623             :      *
     624             :      * Conversely, we activate the module if the feature is enabled.  This is
     625             :      * necessary for primary and standby as the activation depends on the
     626             :      * control file contents at the beginning of recovery or when a
     627             :      * XLOG_PARAMETER_CHANGE is replayed.
     628             :      */
     629        1728 :     if (!track_commit_timestamp)
     630        1690 :         DeactivateCommitTs();
     631             :     else
     632          38 :         ActivateCommitTs();
     633        1728 : }
     634             : 
     635             : /*
     636             :  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
     637             :  * XLog record during recovery.
     638             :  */
     639             : void
     640          64 : CommitTsParameterChange(bool newvalue, bool oldvalue)
     641             : {
     642             :     /*
     643             :      * If the commit_ts module is disabled in this server and we get word from
     644             :      * the primary server that it is enabled there, activate it so that we can
     645             :      * replay future WAL records involving it; also mark it as active on
     646             :      * pg_control.  If the old value was already set, we already did this, so
     647             :      * don't do anything.
     648             :      *
     649             :      * If the module is disabled in the primary, disable it here too, unless
     650             :      * the module is enabled locally.
     651             :      *
     652             :      * Note this only runs in the recovery process, so an unlocked read is
     653             :      * fine.
     654             :      */
     655          64 :     if (newvalue)
     656             :     {
     657           4 :         if (!commitTsShared->commitTsActive)
     658           0 :             ActivateCommitTs();
     659             :     }
     660          60 :     else if (commitTsShared->commitTsActive)
     661           2 :         DeactivateCommitTs();
     662          64 : }
     663             : 
     664             : /*
     665             :  * Activate this module whenever necessary.
     666             :  *      This must happen during postmaster or standalone-backend startup,
     667             :  *      or during WAL replay anytime the track_commit_timestamp setting is
     668             :  *      changed in the primary.
     669             :  *
     670             :  * The reason why this SLRU needs separate activation/deactivation functions is
     671             :  * that it can be enabled/disabled during start and the activation/deactivation
     672             :  * on the primary is propagated to the standby via replay. Other SLRUs don't
     673             :  * have this property and they can be just initialized during normal startup.
     674             :  *
     675             :  * This is in charge of creating the currently active segment, if it's not
     676             :  * already there.  The reason for this is that the server might have been
     677             :  * running with this module disabled for a while and thus might have skipped
     678             :  * the normal creation point.
     679             :  */
     680             : static void
     681          64 : ActivateCommitTs(void)
     682             : {
     683             :     TransactionId xid;
     684             :     int64       pageno;
     685             : 
     686             :     /*
     687             :      * During bootstrap, we should not register commit timestamps so skip the
     688             :      * activation in this case.
     689             :      */
     690          64 :     if (IsBootstrapProcessingMode())
     691           4 :         return;
     692             : 
     693             :     /* If we've done this already, there's nothing to do */
     694          60 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     695          60 :     if (commitTsShared->commitTsActive)
     696             :     {
     697          10 :         LWLockRelease(CommitTsLock);
     698          10 :         return;
     699             :     }
     700          50 :     LWLockRelease(CommitTsLock);
     701             : 
     702          50 :     xid = XidFromFullTransactionId(TransamVariables->nextXid);
     703          50 :     pageno = TransactionIdToCTsPage(xid);
     704             : 
     705             :     /*
     706             :      * Re-Initialize our idea of the latest page number.
     707             :      */
     708          50 :     pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
     709             : 
     710             :     /*
     711             :      * If CommitTs is enabled, but it wasn't in the previous server run, we
     712             :      * need to set the oldest and newest values to the next Xid; that way, we
     713             :      * will not try to read data that might not have been set.
     714             :      *
     715             :      * XXX does this have a problem if a server is started with commitTs
     716             :      * enabled, then started with commitTs disabled, then restarted with it
     717             :      * enabled again?  It doesn't look like it does, because there should be a
     718             :      * checkpoint that sets the value to InvalidTransactionId at end of
     719             :      * recovery; and so any chance of injecting new transactions without
     720             :      * CommitTs values would occur after the oldestCommitTsXid has been set to
     721             :      * Invalid temporarily.
     722             :      */
     723          50 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     724          50 :     if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
     725             :     {
     726          28 :         TransamVariables->oldestCommitTsXid =
     727          28 :             TransamVariables->newestCommitTsXid = ReadNextTransactionId();
     728             :     }
     729          50 :     LWLockRelease(CommitTsLock);
     730             : 
     731             :     /* Create the current segment file, if necessary */
     732          50 :     if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
     733          24 :         SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
     734             : 
     735             :     /* Change the activation status in shared memory. */
     736          50 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     737          50 :     commitTsShared->commitTsActive = true;
     738          50 :     LWLockRelease(CommitTsLock);
     739             : }
     740             : 
     741             : /*
     742             :  * Deactivate this module.
     743             :  *
     744             :  * This must be called when the track_commit_timestamp parameter is turned off.
     745             :  * This happens during postmaster or standalone-backend startup, or during WAL
     746             :  * replay.
     747             :  *
     748             :  * Resets CommitTs into invalid state to make sure we don't hand back
     749             :  * possibly-invalid data; also removes segments of old data.
     750             :  */
     751             : static void
     752        1692 : DeactivateCommitTs(void)
     753             : {
     754             :     /*
     755             :      * Cleanup the status in the shared memory.
     756             :      *
     757             :      * We reset everything in the commitTsShared record to prevent user from
     758             :      * getting confusing data about last committed transaction on the standby
     759             :      * when the module was activated repeatedly on the primary.
     760             :      */
     761        1692 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     762             : 
     763        1692 :     commitTsShared->commitTsActive = false;
     764        1692 :     commitTsShared->xidLastCommit = InvalidTransactionId;
     765        1692 :     TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     766        1692 :     commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
     767             : 
     768        1692 :     TransamVariables->oldestCommitTsXid = InvalidTransactionId;
     769        1692 :     TransamVariables->newestCommitTsXid = InvalidTransactionId;
     770             : 
     771             :     /*
     772             :      * Remove *all* files.  This is necessary so that there are no leftover
     773             :      * files; in the case where this feature is later enabled after running
     774             :      * with it disabled for some time there may be a gap in the file sequence.
     775             :      * (We can probably tolerate out-of-sequence files, as they are going to
     776             :      * be overwritten anyway when we wrap around, but it seems better to be
     777             :      * tidy.)
     778             :      *
     779             :      * Note that we do this with CommitTsLock acquired in exclusive mode. This
     780             :      * is very heavy-handed, but since this routine can only be called in the
     781             :      * replica and should happen very rarely, we don't worry too much about
     782             :      * it.  Note also that no process should be consulting this SLRU if we
     783             :      * have just deactivated it.
     784             :      */
     785        1692 :     (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
     786             : 
     787        1692 :     LWLockRelease(CommitTsLock);
     788        1692 : }
     789             : 
     790             : /*
     791             :  * Perform a checkpoint --- either during shutdown, or on-the-fly
     792             :  */
     793             : void
     794        3366 : CheckPointCommitTs(void)
     795             : {
     796             :     /*
     797             :      * Write dirty CommitTs pages to disk.  This may result in sync requests
     798             :      * queued for later handling by ProcessSyncRequests(), as part of the
     799             :      * checkpoint.
     800             :      */
     801        3366 :     SimpleLruWriteAll(CommitTsCtl, true);
     802        3366 : }
     803             : 
     804             : /*
     805             :  * Make sure that CommitTs has room for a newly-allocated XID.
     806             :  *
     807             :  * NB: this is called while holding XidGenLock.  We want it to be very fast
     808             :  * most of the time; even when it's not so fast, no actual I/O need happen
     809             :  * unless we're forced to write out a dirty CommitTs or xlog page to make room
     810             :  * in shared memory.
     811             :  *
     812             :  * NB: the current implementation relies on track_commit_timestamp being
     813             :  * PGC_POSTMASTER.
     814             :  */
     815             : void
     816    49000392 : ExtendCommitTs(TransactionId newestXact)
     817             : {
     818             :     int64       pageno;
     819             :     LWLock     *lock;
     820             : 
     821             :     /*
     822             :      * Nothing to do if module not enabled.  Note we do an unlocked read of
     823             :      * the flag here, which is okay because this routine is only called from
     824             :      * GetNewTransactionId, which is never called in a standby.
     825             :      */
     826             :     Assert(!InRecovery);
     827    49000392 :     if (!commitTsShared->commitTsActive)
     828    48998706 :         return;
     829             : 
     830             :     /*
     831             :      * No work except at first XID of a page.  But beware: just after
     832             :      * wraparound, the first XID of page zero is FirstNormalTransactionId.
     833             :      */
     834        1686 :     if (TransactionIdToCTsEntry(newestXact) != 0 &&
     835             :         !TransactionIdEquals(newestXact, FirstNormalTransactionId))
     836        1684 :         return;
     837             : 
     838           2 :     pageno = TransactionIdToCTsPage(newestXact);
     839             : 
     840           2 :     lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     841             : 
     842           2 :     LWLockAcquire(lock, LW_EXCLUSIVE);
     843             : 
     844             :     /* Zero the page ... */
     845           2 :     SimpleLruZeroPage(CommitTsCtl, pageno);
     846             : 
     847             :     /* and make a WAL entry about that, unless we're in REDO */
     848           2 :     if (!InRecovery)
     849           2 :         XLogSimpleInsertInt64(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE, pageno);
     850             : 
     851           2 :     LWLockRelease(lock);
     852             : }
     853             : 
     854             : /*
     855             :  * Remove all CommitTs segments before the one holding the passed
     856             :  * transaction ID.
     857             :  *
     858             :  * Note that we don't need to flush XLOG here.
     859             :  */
     860             : void
     861        2390 : TruncateCommitTs(TransactionId oldestXact)
     862             : {
     863             :     int64       cutoffPage;
     864             : 
     865             :     /*
     866             :      * The cutoff point is the start of the segment containing oldestXact. We
     867             :      * pass the *page* containing oldestXact to SimpleLruTruncate.
     868             :      */
     869        2390 :     cutoffPage = TransactionIdToCTsPage(oldestXact);
     870             : 
     871             :     /* Check to see if there's any files that could be removed */
     872        2390 :     if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
     873             :                            &cutoffPage))
     874        2390 :         return;                 /* nothing to remove */
     875             : 
     876             :     /* Write XLOG record */
     877           0 :     WriteTruncateXlogRec(cutoffPage, oldestXact);
     878             : 
     879             :     /* Now we can remove the old CommitTs segment(s) */
     880           0 :     SimpleLruTruncate(CommitTsCtl, cutoffPage);
     881             : }
     882             : 
     883             : /*
     884             :  * Set the limit values between which commit TS can be consulted.
     885             :  */
     886             : void
     887        1948 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
     888             : {
     889             :     /*
     890             :      * Be careful not to overwrite values that are either further into the
     891             :      * "future" or signal a disabled committs.
     892             :      */
     893        1948 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     894        1948 :     if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
     895             :     {
     896           0 :         if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
     897           0 :             TransamVariables->oldestCommitTsXid = oldestXact;
     898           0 :         if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
     899           0 :             TransamVariables->newestCommitTsXid = newestXact;
     900             :     }
     901             :     else
     902             :     {
     903             :         Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
     904        1948 :         TransamVariables->oldestCommitTsXid = oldestXact;
     905        1948 :         TransamVariables->newestCommitTsXid = newestXact;
     906             :     }
     907        1948 :     LWLockRelease(CommitTsLock);
     908        1948 : }
     909             : 
     910             : /*
     911             :  * Move forwards the oldest commitTS value that can be consulted
     912             :  */
     913             : void
     914        2390 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
     915             : {
     916        2390 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     917        2392 :     if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
     918           2 :         TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
     919           2 :         TransamVariables->oldestCommitTsXid = oldestXact;
     920        2390 :     LWLockRelease(CommitTsLock);
     921        2390 : }
     922             : 
     923             : 
     924             : /*
     925             :  * Decide whether a commitTS page number is "older" for truncation purposes.
     926             :  * Analogous to CLOGPagePrecedes().
     927             :  *
     928             :  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.  This
     929             :  * introduces differences compared to CLOG and the other SLRUs having (1 <<
     930             :  * 31) % per_page == 0.  This function never tests exactly
     931             :  * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
     932             :  * there are two possible counts of page boundaries between oldestXact and the
     933             :  * latest XID assigned, depending on whether oldestXact is within the first
     934             :  * 128 entries of its page.  Since this function doesn't know the location of
     935             :  * oldestXact within page2, it returns false for one page that actually is
     936             :  * expendable.  This is a wider (yet still negligible) version of the
     937             :  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
     938             :  *
     939             :  * For the sake of a worked example, number entries with decimal values such
     940             :  * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
     941             :  * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
     942             :  * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
     943             :  * because entry=2.85 is the border that toggles whether entries precede the
     944             :  * last entry of the oldestXact page.  While page 2 is expendable at
     945             :  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
     946             :  */
     947             : static bool
     948           2 : CommitTsPagePrecedes(int64 page1, int64 page2)
     949             : {
     950             :     TransactionId xid1;
     951             :     TransactionId xid2;
     952             : 
     953           2 :     xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
     954           2 :     xid1 += FirstNormalTransactionId + 1;
     955           2 :     xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
     956           2 :     xid2 += FirstNormalTransactionId + 1;
     957             : 
     958           2 :     return (TransactionIdPrecedes(xid1, xid2) &&
     959           0 :             TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
     960             : }
     961             : 
     962             : 
     963             : /*
     964             :  * Write a TRUNCATE xlog record
     965             :  */
     966             : static void
     967           0 : WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
     968             : {
     969             :     xl_commit_ts_truncate xlrec;
     970             : 
     971           0 :     xlrec.pageno = pageno;
     972           0 :     xlrec.oldestXid = oldestXid;
     973             : 
     974           0 :     XLogBeginInsert();
     975           0 :     XLogRegisterData(&xlrec, SizeOfCommitTsTruncate);
     976           0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
     977           0 : }
     978             : 
     979             : /*
     980             :  * CommitTS resource manager's routines
     981             :  */
     982             : void
     983           0 : commit_ts_redo(XLogReaderState *record)
     984             : {
     985           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     986             : 
     987             :     /* Backup blocks are not used in commit_ts records */
     988             :     Assert(!XLogRecHasAnyBlockRefs(record));
     989             : 
     990           0 :     if (info == COMMIT_TS_ZEROPAGE)
     991             :     {
     992             :         int64       pageno;
     993             : 
     994           0 :         memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
     995           0 :         SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
     996             :     }
     997           0 :     else if (info == COMMIT_TS_TRUNCATE)
     998             :     {
     999           0 :         xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
    1000             : 
    1001           0 :         AdvanceOldestCommitTsXid(trunc->oldestXid);
    1002             : 
    1003             :         /*
    1004             :          * During XLOG replay, latest_page_number isn't set up yet; insert a
    1005             :          * suitable value to bypass the sanity test in SimpleLruTruncate.
    1006             :          */
    1007           0 :         pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
    1008           0 :                             trunc->pageno);
    1009             : 
    1010           0 :         SimpleLruTruncate(CommitTsCtl, trunc->pageno);
    1011             :     }
    1012             :     else
    1013           0 :         elog(PANIC, "commit_ts_redo: unknown op code %u", info);
    1014           0 : }
    1015             : 
    1016             : /*
    1017             :  * Entrypoint for sync.c to sync commit_ts files.
    1018             :  */
    1019             : int
    1020           0 : committssyncfiletag(const FileTag *ftag, char *path)
    1021             : {
    1022           0 :     return SlruSyncFileTag(CommitTsCtl, ftag, path);
    1023             : }

Generated by: LCOV version 1.16