Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * commit_ts.c
4 : * PostgreSQL commit timestamp manager
5 : *
6 : * This module is a pg_xact-like system that stores the commit timestamp
7 : * for each transaction.
8 : *
9 : * XLOG interactions: this module generates an XLOG record whenever a new
10 : * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 : * from recording of transaction commit in xact.c, which generates its own
12 : * XLOG records for these events and will re-perform the status update on
13 : * redo; so we need make no additional XLOG entry here.
14 : *
15 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
16 : * Portions Copyright (c) 1994, Regents of the University of California
17 : *
18 : * src/backend/access/transam/commit_ts.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres.h"
23 :
24 : #include "access/commit_ts.h"
25 : #include "access/htup_details.h"
26 : #include "access/slru.h"
27 : #include "access/transam.h"
28 : #include "access/xloginsert.h"
29 : #include "access/xlogutils.h"
30 : #include "funcapi.h"
31 : #include "miscadmin.h"
32 : #include "storage/shmem.h"
33 : #include "utils/fmgrprotos.h"
34 : #include "utils/guc_hooks.h"
35 : #include "utils/timestamp.h"
36 :
37 : /*
38 : * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
39 : * everywhere else in Postgres.
40 : *
41 : * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
42 : * CommitTs page numbering also wraps around at
43 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
44 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
45 : * explicit notice of that fact in this module, except when comparing segment
46 : * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
47 : */
48 :
49 : /*
50 : * We need 8+2 bytes per xact. Note that enlarging this struct might mean
51 : * the largest possible file name is more than 5 chars long; see
52 : * SlruScanDirectory.
53 : */
54 : typedef struct CommitTimestampEntry
55 : {
56 : TimestampTz time;
57 : RepOriginId nodeid;
58 : } CommitTimestampEntry;
59 :
60 : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61 : sizeof(RepOriginId))
62 :
63 : #define COMMIT_TS_XACTS_PER_PAGE \
64 : (BLCKSZ / SizeOfCommitTimestampEntry)
65 :
66 :
67 : /*
68 : * Although we return an int64 the actual value can't currently exceed
69 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
70 : */
71 : static inline int64
72 4224 : TransactionIdToCTsPage(TransactionId xid)
73 : {
74 4224 : return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
75 : }
76 :
77 : #define TransactionIdToCTsEntry(xid) \
78 : ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79 :
80 : /*
81 : * Link to shared-memory data structures for CommitTs control
82 : */
83 : static SlruCtlData CommitTsCtlData;
84 :
85 : #define CommitTsCtl (&CommitTsCtlData)
86 :
87 : /*
88 : * We keep a cache of the last value set in shared memory.
89 : *
90 : * This is also good place to keep the activation status. We keep this
91 : * separate from the GUC so that the standby can activate the module if the
92 : * primary has it active independently of the value of the GUC.
93 : *
94 : * This is protected by CommitTsLock. In some places, we use commitTsActive
95 : * without acquiring the lock; where this happens, a comment explains the
96 : * rationale for it.
97 : */
98 : typedef struct CommitTimestampShared
99 : {
100 : TransactionId xidLastCommit;
101 : CommitTimestampEntry dataLastCommit;
102 : bool commitTsActive;
103 : } CommitTimestampShared;
104 :
105 : static CommitTimestampShared *commitTsShared;
106 :
107 :
108 : /* GUC variable */
109 : bool track_commit_timestamp;
110 :
111 : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
112 : TransactionId *subxids, TimestampTz ts,
113 : RepOriginId nodeid, int64 pageno);
114 : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
115 : RepOriginId nodeid, int slotno);
116 : static void error_commit_ts_disabled(void);
117 : static bool CommitTsPagePrecedes(int64 page1, int64 page2);
118 : static void ActivateCommitTs(void);
119 : static void DeactivateCommitTs(void);
120 : static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
121 :
122 : /*
123 : * TransactionTreeSetCommitTsData
124 : *
125 : * Record the final commit timestamp of transaction entries in the commit log
126 : * for a transaction and its subtransaction tree, as efficiently as possible.
127 : *
128 : * xid is the top level transaction id.
129 : *
130 : * subxids is an array of xids of length nsubxids, representing subtransactions
131 : * in the tree of xid. In various cases nsubxids may be zero.
132 : * The reason why tracking just the parent xid commit timestamp is not enough
133 : * is that the subtrans SLRU does not stay valid across crashes (it's not
134 : * permanent) so we need to keep the information about them here. If the
135 : * subtrans implementation changes in the future, we might want to revisit the
136 : * decision of storing timestamp info for each subxid.
137 : */
138 : void
139 297212 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
140 : TransactionId *subxids, TimestampTz timestamp,
141 : RepOriginId nodeid)
142 : {
143 : int i;
144 : TransactionId headxid;
145 : TransactionId newestXact;
146 :
147 : /*
148 : * No-op if the module is not active.
149 : *
150 : * An unlocked read here is fine, because in a standby (the only place
151 : * where the flag can change in flight) this routine is only called by the
152 : * recovery process, which is also the only process which can change the
153 : * flag.
154 : */
155 297212 : if (!commitTsShared->commitTsActive)
156 295508 : return;
157 :
158 : /*
159 : * Figure out the latest Xid in this batch: either the last subxid if
160 : * there's any, otherwise the parent xid.
161 : */
162 1704 : if (nsubxids > 0)
163 0 : newestXact = subxids[nsubxids - 1];
164 : else
165 1704 : newestXact = xid;
166 :
167 : /*
168 : * We split the xids to set the timestamp to in groups belonging to the
169 : * same SLRU page; the first element in each such set is its head. The
170 : * first group has the main XID as the head; subsequent sets use the first
171 : * subxid not on the previous page as head. This way, we only have to
172 : * lock/modify each SLRU page once.
173 : */
174 1704 : headxid = xid;
175 1704 : i = 0;
176 : for (;;)
177 0 : {
178 1704 : int64 pageno = TransactionIdToCTsPage(headxid);
179 : int j;
180 :
181 1704 : for (j = i; j < nsubxids; j++)
182 : {
183 0 : if (TransactionIdToCTsPage(subxids[j]) != pageno)
184 0 : break;
185 : }
186 : /* subxids[i..j] are on the same page as the head */
187 :
188 1704 : SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
189 : pageno);
190 :
191 : /* if we wrote out all subxids, we're done. */
192 1704 : if (j >= nsubxids)
193 1704 : break;
194 :
195 : /*
196 : * Set the new head and skip over it, as well as over the subxids we
197 : * just wrote.
198 : */
199 0 : headxid = subxids[j];
200 0 : i = j + 1;
201 : }
202 :
203 : /* update the cached value in shared memory */
204 1704 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
205 1704 : commitTsShared->xidLastCommit = xid;
206 1704 : commitTsShared->dataLastCommit.time = timestamp;
207 1704 : commitTsShared->dataLastCommit.nodeid = nodeid;
208 :
209 : /* and move forwards our endpoint, if needed */
210 1704 : if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
211 1682 : TransamVariables->newestCommitTsXid = newestXact;
212 1704 : LWLockRelease(CommitTsLock);
213 : }
214 :
215 : /*
216 : * Record the commit timestamp of transaction entries in the commit log for all
217 : * entries on a single page. Atomic only on this page.
218 : */
219 : static void
220 1704 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
221 : TransactionId *subxids, TimestampTz ts,
222 : RepOriginId nodeid, int64 pageno)
223 : {
224 1704 : LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
225 : int slotno;
226 : int i;
227 :
228 1704 : LWLockAcquire(lock, LW_EXCLUSIVE);
229 :
230 1704 : slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
231 :
232 1704 : TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
233 1704 : for (i = 0; i < nsubxids; i++)
234 0 : TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
235 :
236 1704 : CommitTsCtl->shared->page_dirty[slotno] = true;
237 :
238 1704 : LWLockRelease(lock);
239 1704 : }
240 :
241 : /*
242 : * Sets the commit timestamp of a single transaction.
243 : *
244 : * Caller must hold the correct SLRU bank lock, will be held at exit
245 : */
246 : static void
247 1704 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
248 : RepOriginId nodeid, int slotno)
249 : {
250 1704 : int entryno = TransactionIdToCTsEntry(xid);
251 : CommitTimestampEntry entry;
252 :
253 : Assert(TransactionIdIsNormal(xid));
254 :
255 1704 : entry.time = ts;
256 1704 : entry.nodeid = nodeid;
257 :
258 1704 : memcpy(CommitTsCtl->shared->page_buffer[slotno] +
259 1704 : SizeOfCommitTimestampEntry * entryno,
260 : &entry, SizeOfCommitTimestampEntry);
261 1704 : }
262 :
263 : /*
264 : * Interrogate the commit timestamp of a transaction.
265 : *
266 : * The return value indicates whether a commit timestamp record was found for
267 : * the given xid. The timestamp value is returned in *ts (which may not be
268 : * null), and the origin node for the Xid is returned in *nodeid, if it's not
269 : * null.
270 : */
271 : bool
272 78 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
273 : RepOriginId *nodeid)
274 : {
275 78 : int64 pageno = TransactionIdToCTsPage(xid);
276 78 : int entryno = TransactionIdToCTsEntry(xid);
277 : int slotno;
278 : CommitTimestampEntry entry;
279 : TransactionId oldestCommitTsXid;
280 : TransactionId newestCommitTsXid;
281 :
282 78 : if (!TransactionIdIsValid(xid))
283 6 : ereport(ERROR,
284 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
285 : errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
286 72 : else if (!TransactionIdIsNormal(xid))
287 : {
288 : /* frozen and bootstrap xids are always committed far in the past */
289 12 : *ts = 0;
290 12 : if (nodeid)
291 4 : *nodeid = 0;
292 12 : return false;
293 : }
294 :
295 60 : LWLockAcquire(CommitTsLock, LW_SHARED);
296 :
297 : /* Error if module not enabled */
298 60 : if (!commitTsShared->commitTsActive)
299 4 : error_commit_ts_disabled();
300 :
301 : /*
302 : * If we're asked for the cached value, return that. Otherwise, fall
303 : * through to read from SLRU.
304 : */
305 56 : if (commitTsShared->xidLastCommit == xid)
306 : {
307 28 : *ts = commitTsShared->dataLastCommit.time;
308 28 : if (nodeid)
309 14 : *nodeid = commitTsShared->dataLastCommit.nodeid;
310 :
311 28 : LWLockRelease(CommitTsLock);
312 28 : return *ts != 0;
313 : }
314 :
315 28 : oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
316 28 : newestCommitTsXid = TransamVariables->newestCommitTsXid;
317 : /* neither is invalid, or both are */
318 : Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
319 28 : LWLockRelease(CommitTsLock);
320 :
321 : /*
322 : * Return empty if the requested value is outside our valid range.
323 : */
324 56 : if (!TransactionIdIsValid(oldestCommitTsXid) ||
325 50 : TransactionIdPrecedes(xid, oldestCommitTsXid) ||
326 22 : TransactionIdPrecedes(newestCommitTsXid, xid))
327 : {
328 6 : *ts = 0;
329 6 : if (nodeid)
330 0 : *nodeid = InvalidRepOriginId;
331 6 : return false;
332 : }
333 :
334 : /* lock is acquired by SimpleLruReadPage_ReadOnly */
335 22 : slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
336 22 : memcpy(&entry,
337 22 : CommitTsCtl->shared->page_buffer[slotno] +
338 22 : SizeOfCommitTimestampEntry * entryno,
339 : SizeOfCommitTimestampEntry);
340 :
341 22 : *ts = entry.time;
342 22 : if (nodeid)
343 8 : *nodeid = entry.nodeid;
344 :
345 22 : LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
346 22 : return *ts != 0;
347 : }
348 :
349 : /*
350 : * Return the Xid of the latest committed transaction. (As far as this module
351 : * is concerned, anyway; it's up to the caller to ensure the value is useful
352 : * for its purposes.)
353 : *
354 : * ts and nodeid are filled with the corresponding data; they can be passed
355 : * as NULL if not wanted.
356 : */
357 : TransactionId
358 6 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
359 : {
360 : TransactionId xid;
361 :
362 6 : LWLockAcquire(CommitTsLock, LW_SHARED);
363 :
364 : /* Error if module not enabled */
365 6 : if (!commitTsShared->commitTsActive)
366 0 : error_commit_ts_disabled();
367 :
368 6 : xid = commitTsShared->xidLastCommit;
369 6 : if (ts)
370 6 : *ts = commitTsShared->dataLastCommit.time;
371 6 : if (nodeid)
372 6 : *nodeid = commitTsShared->dataLastCommit.nodeid;
373 6 : LWLockRelease(CommitTsLock);
374 :
375 6 : return xid;
376 : }
377 :
378 : static void
379 4 : error_commit_ts_disabled(void)
380 : {
381 4 : ereport(ERROR,
382 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
383 : errmsg("could not get commit timestamp data"),
384 : RecoveryInProgress() ?
385 : errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
386 : "track_commit_timestamp") :
387 : errhint("Make sure the configuration parameter \"%s\" is set.",
388 : "track_commit_timestamp")));
389 : }
390 :
391 : /*
392 : * SQL-callable wrapper to obtain commit time of a transaction
393 : */
394 : Datum
395 50 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
396 : {
397 50 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
398 : TimestampTz ts;
399 : bool found;
400 :
401 50 : found = TransactionIdGetCommitTsData(xid, &ts, NULL);
402 :
403 42 : if (!found)
404 14 : PG_RETURN_NULL();
405 :
406 28 : PG_RETURN_TIMESTAMPTZ(ts);
407 : }
408 :
409 :
410 : /*
411 : * pg_last_committed_xact
412 : *
413 : * SQL-callable wrapper to obtain some information about the latest
414 : * committed transaction: transaction ID, timestamp and replication
415 : * origin.
416 : */
417 : Datum
418 6 : pg_last_committed_xact(PG_FUNCTION_ARGS)
419 : {
420 : TransactionId xid;
421 : RepOriginId nodeid;
422 : TimestampTz ts;
423 : Datum values[3];
424 : bool nulls[3];
425 : TupleDesc tupdesc;
426 : HeapTuple htup;
427 :
428 : /* and construct a tuple with our data */
429 6 : xid = GetLatestCommitTsData(&ts, &nodeid);
430 :
431 6 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
432 0 : elog(ERROR, "return type must be a row type");
433 :
434 6 : if (!TransactionIdIsNormal(xid))
435 : {
436 0 : memset(nulls, true, sizeof(nulls));
437 : }
438 : else
439 : {
440 6 : values[0] = TransactionIdGetDatum(xid);
441 6 : nulls[0] = false;
442 :
443 6 : values[1] = TimestampTzGetDatum(ts);
444 6 : nulls[1] = false;
445 :
446 6 : values[2] = ObjectIdGetDatum((Oid) nodeid);
447 6 : nulls[2] = false;
448 : }
449 :
450 6 : htup = heap_form_tuple(tupdesc, values, nulls);
451 :
452 6 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
453 : }
454 :
455 : /*
456 : * pg_xact_commit_timestamp_origin
457 : *
458 : * SQL-callable wrapper to obtain commit timestamp and replication origin
459 : * of a given transaction.
460 : */
461 : Datum
462 10 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
463 : {
464 10 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
465 : RepOriginId nodeid;
466 : TimestampTz ts;
467 : Datum values[2];
468 : bool nulls[2];
469 : TupleDesc tupdesc;
470 : HeapTuple htup;
471 : bool found;
472 :
473 10 : found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
474 :
475 8 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
476 0 : elog(ERROR, "return type must be a row type");
477 :
478 8 : if (!found)
479 : {
480 4 : memset(nulls, true, sizeof(nulls));
481 : }
482 : else
483 : {
484 4 : values[0] = TimestampTzGetDatum(ts);
485 4 : nulls[0] = false;
486 :
487 4 : values[1] = ObjectIdGetDatum((Oid) nodeid);
488 4 : nulls[1] = false;
489 : }
490 :
491 8 : htup = heap_form_tuple(tupdesc, values, nulls);
492 :
493 8 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
494 : }
495 :
496 : /*
497 : * Number of shared CommitTS buffers.
498 : *
499 : * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
500 : * Otherwise just cap the configured amount to be between 16 and the maximum
501 : * allowed.
502 : */
503 : static int
504 8220 : CommitTsShmemBuffers(void)
505 : {
506 : /* auto-tune based on shared buffers */
507 8220 : if (commit_timestamp_buffers == 0)
508 6076 : return SimpleLruAutotuneBuffers(512, 1024);
509 :
510 2144 : return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
511 : }
512 :
513 : /*
514 : * Shared memory sizing for CommitTs
515 : */
516 : Size
517 3962 : CommitTsShmemSize(void)
518 : {
519 3962 : return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
520 : sizeof(CommitTimestampShared);
521 : }
522 :
523 : /*
524 : * Initialize CommitTs at system startup (postmaster start or standalone
525 : * backend)
526 : */
527 : void
528 2134 : CommitTsShmemInit(void)
529 : {
530 : bool found;
531 :
532 : /* If auto-tuning is requested, now is the time to do it */
533 2134 : if (commit_timestamp_buffers == 0)
534 : {
535 : char buf[32];
536 :
537 2124 : snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
538 2124 : SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
539 : PGC_S_DYNAMIC_DEFAULT);
540 :
541 : /*
542 : * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
543 : * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
544 : * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
545 : * that and we must force the matter with PGC_S_OVERRIDE.
546 : */
547 2124 : if (commit_timestamp_buffers == 0) /* failed to apply it? */
548 0 : SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
549 : PGC_S_OVERRIDE);
550 : }
551 : Assert(commit_timestamp_buffers != 0);
552 :
553 2134 : CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
554 2134 : SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
555 : "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
556 : LWTRANCHE_COMMITTS_SLRU,
557 : SYNC_HANDLER_COMMIT_TS,
558 : false);
559 : SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
560 :
561 2134 : commitTsShared = ShmemInitStruct("CommitTs shared",
562 : sizeof(CommitTimestampShared),
563 : &found);
564 :
565 2134 : if (!IsUnderPostmaster)
566 : {
567 : Assert(!found);
568 :
569 2134 : commitTsShared->xidLastCommit = InvalidTransactionId;
570 2134 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
571 2134 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
572 2134 : commitTsShared->commitTsActive = false;
573 : }
574 : else
575 : Assert(found);
576 2134 : }
577 :
578 : /*
579 : * GUC check_hook for commit_timestamp_buffers
580 : */
581 : bool
582 4334 : check_commit_ts_buffers(int *newval, void **extra, GucSource source)
583 : {
584 4334 : return check_slru_buffers("commit_timestamp_buffers", newval);
585 : }
586 :
587 : /*
588 : * This function must be called ONCE on system install.
589 : *
590 : * (The CommitTs directory is assumed to have been created by initdb, and
591 : * CommitTsShmemInit must have been called already.)
592 : */
593 : void
594 102 : BootStrapCommitTs(void)
595 : {
596 : /*
597 : * Nothing to do here at present, unlike most other SLRU modules; segments
598 : * are created when the server is started with this module enabled. See
599 : * ActivateCommitTs.
600 : */
601 102 : }
602 :
603 : /*
604 : * This must be called ONCE during postmaster or standalone-backend startup,
605 : * after StartupXLOG has initialized TransamVariables->nextXid.
606 : */
607 : void
608 26 : StartupCommitTs(void)
609 : {
610 26 : ActivateCommitTs();
611 26 : }
612 :
613 : /*
614 : * This must be called ONCE during postmaster or standalone-backend startup,
615 : * after recovery has finished.
616 : */
617 : void
618 1728 : CompleteCommitTsInitialization(void)
619 : {
620 : /*
621 : * If the feature is not enabled, turn it off for good. This also removes
622 : * any leftover data.
623 : *
624 : * Conversely, we activate the module if the feature is enabled. This is
625 : * necessary for primary and standby as the activation depends on the
626 : * control file contents at the beginning of recovery or when a
627 : * XLOG_PARAMETER_CHANGE is replayed.
628 : */
629 1728 : if (!track_commit_timestamp)
630 1690 : DeactivateCommitTs();
631 : else
632 38 : ActivateCommitTs();
633 1728 : }
634 :
635 : /*
636 : * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
637 : * XLog record during recovery.
638 : */
639 : void
640 64 : CommitTsParameterChange(bool newvalue, bool oldvalue)
641 : {
642 : /*
643 : * If the commit_ts module is disabled in this server and we get word from
644 : * the primary server that it is enabled there, activate it so that we can
645 : * replay future WAL records involving it; also mark it as active on
646 : * pg_control. If the old value was already set, we already did this, so
647 : * don't do anything.
648 : *
649 : * If the module is disabled in the primary, disable it here too, unless
650 : * the module is enabled locally.
651 : *
652 : * Note this only runs in the recovery process, so an unlocked read is
653 : * fine.
654 : */
655 64 : if (newvalue)
656 : {
657 4 : if (!commitTsShared->commitTsActive)
658 0 : ActivateCommitTs();
659 : }
660 60 : else if (commitTsShared->commitTsActive)
661 2 : DeactivateCommitTs();
662 64 : }
663 :
664 : /*
665 : * Activate this module whenever necessary.
666 : * This must happen during postmaster or standalone-backend startup,
667 : * or during WAL replay anytime the track_commit_timestamp setting is
668 : * changed in the primary.
669 : *
670 : * The reason why this SLRU needs separate activation/deactivation functions is
671 : * that it can be enabled/disabled during start and the activation/deactivation
672 : * on the primary is propagated to the standby via replay. Other SLRUs don't
673 : * have this property and they can be just initialized during normal startup.
674 : *
675 : * This is in charge of creating the currently active segment, if it's not
676 : * already there. The reason for this is that the server might have been
677 : * running with this module disabled for a while and thus might have skipped
678 : * the normal creation point.
679 : */
680 : static void
681 64 : ActivateCommitTs(void)
682 : {
683 : TransactionId xid;
684 : int64 pageno;
685 :
686 : /*
687 : * During bootstrap, we should not register commit timestamps so skip the
688 : * activation in this case.
689 : */
690 64 : if (IsBootstrapProcessingMode())
691 4 : return;
692 :
693 : /* If we've done this already, there's nothing to do */
694 60 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
695 60 : if (commitTsShared->commitTsActive)
696 : {
697 10 : LWLockRelease(CommitTsLock);
698 10 : return;
699 : }
700 50 : LWLockRelease(CommitTsLock);
701 :
702 50 : xid = XidFromFullTransactionId(TransamVariables->nextXid);
703 50 : pageno = TransactionIdToCTsPage(xid);
704 :
705 : /*
706 : * Re-Initialize our idea of the latest page number.
707 : */
708 50 : pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
709 :
710 : /*
711 : * If CommitTs is enabled, but it wasn't in the previous server run, we
712 : * need to set the oldest and newest values to the next Xid; that way, we
713 : * will not try to read data that might not have been set.
714 : *
715 : * XXX does this have a problem if a server is started with commitTs
716 : * enabled, then started with commitTs disabled, then restarted with it
717 : * enabled again? It doesn't look like it does, because there should be a
718 : * checkpoint that sets the value to InvalidTransactionId at end of
719 : * recovery; and so any chance of injecting new transactions without
720 : * CommitTs values would occur after the oldestCommitTsXid has been set to
721 : * Invalid temporarily.
722 : */
723 50 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
724 50 : if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
725 : {
726 28 : TransamVariables->oldestCommitTsXid =
727 28 : TransamVariables->newestCommitTsXid = ReadNextTransactionId();
728 : }
729 50 : LWLockRelease(CommitTsLock);
730 :
731 : /* Create the current segment file, if necessary */
732 50 : if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
733 24 : SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
734 :
735 : /* Change the activation status in shared memory. */
736 50 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
737 50 : commitTsShared->commitTsActive = true;
738 50 : LWLockRelease(CommitTsLock);
739 : }
740 :
741 : /*
742 : * Deactivate this module.
743 : *
744 : * This must be called when the track_commit_timestamp parameter is turned off.
745 : * This happens during postmaster or standalone-backend startup, or during WAL
746 : * replay.
747 : *
748 : * Resets CommitTs into invalid state to make sure we don't hand back
749 : * possibly-invalid data; also removes segments of old data.
750 : */
751 : static void
752 1692 : DeactivateCommitTs(void)
753 : {
754 : /*
755 : * Cleanup the status in the shared memory.
756 : *
757 : * We reset everything in the commitTsShared record to prevent user from
758 : * getting confusing data about last committed transaction on the standby
759 : * when the module was activated repeatedly on the primary.
760 : */
761 1692 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
762 :
763 1692 : commitTsShared->commitTsActive = false;
764 1692 : commitTsShared->xidLastCommit = InvalidTransactionId;
765 1692 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
766 1692 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
767 :
768 1692 : TransamVariables->oldestCommitTsXid = InvalidTransactionId;
769 1692 : TransamVariables->newestCommitTsXid = InvalidTransactionId;
770 :
771 : /*
772 : * Remove *all* files. This is necessary so that there are no leftover
773 : * files; in the case where this feature is later enabled after running
774 : * with it disabled for some time there may be a gap in the file sequence.
775 : * (We can probably tolerate out-of-sequence files, as they are going to
776 : * be overwritten anyway when we wrap around, but it seems better to be
777 : * tidy.)
778 : *
779 : * Note that we do this with CommitTsLock acquired in exclusive mode. This
780 : * is very heavy-handed, but since this routine can only be called in the
781 : * replica and should happen very rarely, we don't worry too much about
782 : * it. Note also that no process should be consulting this SLRU if we
783 : * have just deactivated it.
784 : */
785 1692 : (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
786 :
787 1692 : LWLockRelease(CommitTsLock);
788 1692 : }
789 :
790 : /*
791 : * Perform a checkpoint --- either during shutdown, or on-the-fly
792 : */
793 : void
794 3366 : CheckPointCommitTs(void)
795 : {
796 : /*
797 : * Write dirty CommitTs pages to disk. This may result in sync requests
798 : * queued for later handling by ProcessSyncRequests(), as part of the
799 : * checkpoint.
800 : */
801 3366 : SimpleLruWriteAll(CommitTsCtl, true);
802 3366 : }
803 :
804 : /*
805 : * Make sure that CommitTs has room for a newly-allocated XID.
806 : *
807 : * NB: this is called while holding XidGenLock. We want it to be very fast
808 : * most of the time; even when it's not so fast, no actual I/O need happen
809 : * unless we're forced to write out a dirty CommitTs or xlog page to make room
810 : * in shared memory.
811 : *
812 : * NB: the current implementation relies on track_commit_timestamp being
813 : * PGC_POSTMASTER.
814 : */
815 : void
816 49000392 : ExtendCommitTs(TransactionId newestXact)
817 : {
818 : int64 pageno;
819 : LWLock *lock;
820 :
821 : /*
822 : * Nothing to do if module not enabled. Note we do an unlocked read of
823 : * the flag here, which is okay because this routine is only called from
824 : * GetNewTransactionId, which is never called in a standby.
825 : */
826 : Assert(!InRecovery);
827 49000392 : if (!commitTsShared->commitTsActive)
828 48998706 : return;
829 :
830 : /*
831 : * No work except at first XID of a page. But beware: just after
832 : * wraparound, the first XID of page zero is FirstNormalTransactionId.
833 : */
834 1686 : if (TransactionIdToCTsEntry(newestXact) != 0 &&
835 : !TransactionIdEquals(newestXact, FirstNormalTransactionId))
836 1684 : return;
837 :
838 2 : pageno = TransactionIdToCTsPage(newestXact);
839 :
840 2 : lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
841 :
842 2 : LWLockAcquire(lock, LW_EXCLUSIVE);
843 :
844 : /* Zero the page ... */
845 2 : SimpleLruZeroPage(CommitTsCtl, pageno);
846 :
847 : /* and make a WAL entry about that, unless we're in REDO */
848 2 : if (!InRecovery)
849 2 : XLogSimpleInsertInt64(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE, pageno);
850 :
851 2 : LWLockRelease(lock);
852 : }
853 :
854 : /*
855 : * Remove all CommitTs segments before the one holding the passed
856 : * transaction ID.
857 : *
858 : * Note that we don't need to flush XLOG here.
859 : */
860 : void
861 2390 : TruncateCommitTs(TransactionId oldestXact)
862 : {
863 : int64 cutoffPage;
864 :
865 : /*
866 : * The cutoff point is the start of the segment containing oldestXact. We
867 : * pass the *page* containing oldestXact to SimpleLruTruncate.
868 : */
869 2390 : cutoffPage = TransactionIdToCTsPage(oldestXact);
870 :
871 : /* Check to see if there's any files that could be removed */
872 2390 : if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
873 : &cutoffPage))
874 2390 : return; /* nothing to remove */
875 :
876 : /* Write XLOG record */
877 0 : WriteTruncateXlogRec(cutoffPage, oldestXact);
878 :
879 : /* Now we can remove the old CommitTs segment(s) */
880 0 : SimpleLruTruncate(CommitTsCtl, cutoffPage);
881 : }
882 :
883 : /*
884 : * Set the limit values between which commit TS can be consulted.
885 : */
886 : void
887 1948 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
888 : {
889 : /*
890 : * Be careful not to overwrite values that are either further into the
891 : * "future" or signal a disabled committs.
892 : */
893 1948 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
894 1948 : if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
895 : {
896 0 : if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
897 0 : TransamVariables->oldestCommitTsXid = oldestXact;
898 0 : if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
899 0 : TransamVariables->newestCommitTsXid = newestXact;
900 : }
901 : else
902 : {
903 : Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
904 1948 : TransamVariables->oldestCommitTsXid = oldestXact;
905 1948 : TransamVariables->newestCommitTsXid = newestXact;
906 : }
907 1948 : LWLockRelease(CommitTsLock);
908 1948 : }
909 :
910 : /*
911 : * Move forwards the oldest commitTS value that can be consulted
912 : */
913 : void
914 2390 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
915 : {
916 2390 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
917 2392 : if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
918 2 : TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
919 2 : TransamVariables->oldestCommitTsXid = oldestXact;
920 2390 : LWLockRelease(CommitTsLock);
921 2390 : }
922 :
923 :
924 : /*
925 : * Decide whether a commitTS page number is "older" for truncation purposes.
926 : * Analogous to CLOGPagePrecedes().
927 : *
928 : * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
929 : * introduces differences compared to CLOG and the other SLRUs having (1 <<
930 : * 31) % per_page == 0. This function never tests exactly
931 : * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
932 : * there are two possible counts of page boundaries between oldestXact and the
933 : * latest XID assigned, depending on whether oldestXact is within the first
934 : * 128 entries of its page. Since this function doesn't know the location of
935 : * oldestXact within page2, it returns false for one page that actually is
936 : * expendable. This is a wider (yet still negligible) version of the
937 : * truncation opportunity that CLOGPagePrecedes() cannot recognize.
938 : *
939 : * For the sake of a worked example, number entries with decimal values such
940 : * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
941 : * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
942 : * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
943 : * because entry=2.85 is the border that toggles whether entries precede the
944 : * last entry of the oldestXact page. While page 2 is expendable at
945 : * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
946 : */
947 : static bool
948 2 : CommitTsPagePrecedes(int64 page1, int64 page2)
949 : {
950 : TransactionId xid1;
951 : TransactionId xid2;
952 :
953 2 : xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
954 2 : xid1 += FirstNormalTransactionId + 1;
955 2 : xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
956 2 : xid2 += FirstNormalTransactionId + 1;
957 :
958 2 : return (TransactionIdPrecedes(xid1, xid2) &&
959 0 : TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
960 : }
961 :
962 :
963 : /*
964 : * Write a TRUNCATE xlog record
965 : */
966 : static void
967 0 : WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
968 : {
969 : xl_commit_ts_truncate xlrec;
970 :
971 0 : xlrec.pageno = pageno;
972 0 : xlrec.oldestXid = oldestXid;
973 :
974 0 : XLogBeginInsert();
975 0 : XLogRegisterData(&xlrec, SizeOfCommitTsTruncate);
976 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
977 0 : }
978 :
979 : /*
980 : * CommitTS resource manager's routines
981 : */
982 : void
983 0 : commit_ts_redo(XLogReaderState *record)
984 : {
985 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
986 :
987 : /* Backup blocks are not used in commit_ts records */
988 : Assert(!XLogRecHasAnyBlockRefs(record));
989 :
990 0 : if (info == COMMIT_TS_ZEROPAGE)
991 : {
992 : int64 pageno;
993 :
994 0 : memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
995 0 : SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
996 : }
997 0 : else if (info == COMMIT_TS_TRUNCATE)
998 : {
999 0 : xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
1000 :
1001 0 : AdvanceOldestCommitTsXid(trunc->oldestXid);
1002 :
1003 : /*
1004 : * During XLOG replay, latest_page_number isn't set up yet; insert a
1005 : * suitable value to bypass the sanity test in SimpleLruTruncate.
1006 : */
1007 0 : pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1008 0 : trunc->pageno);
1009 :
1010 0 : SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1011 : }
1012 : else
1013 0 : elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1014 0 : }
1015 :
1016 : /*
1017 : * Entrypoint for sync.c to sync commit_ts files.
1018 : */
1019 : int
1020 0 : committssyncfiletag(const FileTag *ftag, char *path)
1021 : {
1022 0 : return SlruSyncFileTag(CommitTsCtl, ftag, path);
1023 : }
|