PostgreSQL Source Code git master
xlogwait.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * xlogwait.c
4 * Implements waiting for WAL operations to reach specific LSNs.
5 *
6 * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/access/transam/xlogwait.c
10 *
11 * NOTES
12 * This file implements waiting for WAL operations to reach specific LSNs
13 * on both physical standby and primary servers. The core idea is simple:
14 * every process that wants to wait publishes the LSN it needs to the
15 * shared memory, and the appropriate process (startup on standby,
16 * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 * once that LSN has been reached.
18 *
19 * The shared memory used by this module comprises a procInfos
20 * per-backend array with the information of the awaited LSN for each
21 * of the backend processes. The elements of that array are organized
22 * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 * allows for very fast finding of the least awaited LSN for each type.
24 *
25 * In addition, the least-awaited LSN for each type is cached in the
26 * minWaitedLSN array. The waiter process publishes information about
27 * itself to the shared memory and waits on the latch until it is woken
28 * up by the appropriate process, standby is promoted, or the postmaster
29 * dies. Then, it cleans information about itself in the shared memory.
30 *
31 * On standby servers:
32 * - After replaying a WAL record, the startup process performs a fast
33 * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 * whose awaited LSNs are reached.
36 * - After receiving WAL, the walreceiver process performs similar checks
37 * against the flush and write LSNs, waking up waiters in the FLUSH
38 * and WRITE heaps, respectively.
39 *
40 * On primary servers: After flushing WAL, the WAL writer or backend
41 * process performs a similar check against the flush LSN and wakes up
42 * waiters whose target flush LSNs have been reached.
43 *
44 *-------------------------------------------------------------------------
45 */
46
47#include "postgres.h"
48
49#include <float.h>
50#include <math.h>
51
52#include "access/xlog.h"
53#include "access/xlogrecovery.h"
54#include "access/xlogwait.h"
55#include "miscadmin.h"
56#include "pgstat.h"
58#include "storage/latch.h"
59#include "storage/proc.h"
60#include "storage/shmem.h"
61#include "utils/fmgrprotos.h"
62#include "utils/pg_lsn.h"
63#include "utils/snapmgr.h"
64
65
66static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
67 void *arg);
68
70
71/*
72 * Wait event for each WaitLSNType, used with WaitLatch() to report
73 * the wait in pg_stat_activity.
74 */
75static const uint32 WaitLSNWaitEvents[] = {
76 [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
77 [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
78 [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
79 [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
80};
81
83 "WaitLSNWaitEvents must match WaitLSNType enum");
84
85/*
86 * Get the current LSN for the specified wait type.
87 */
90{
91 Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
92
93 switch (lsnType)
94 {
96 return GetXLogReplayRecPtr(NULL);
97
99 return GetWalRcvWriteRecPtr();
100
102 return GetWalRcvFlushRecPtr(NULL, NULL);
103
105 return GetFlushRecPtr(NULL);
106 }
107
108 elog(ERROR, "invalid LSN wait type: %d", lsnType);
110}
111
112/* Report the amount of shared memory space needed for WaitLSNState. */
113Size
115{
116 Size size;
117
118 size = offsetof(WaitLSNState, procInfos);
120 return size;
121}
122
123/* Initialize the WaitLSNState in the shared memory. */
124void
126{
127 bool found;
128
129 waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
131 &found);
132 if (!found)
133 {
134 int i;
135
136 /* Initialize heaps and tracking */
137 for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
138 {
141 }
142
143 /* Initialize process info array */
144 memset(&waitLSNState->procInfos, 0,
146 }
147}
148
149/*
150 * Comparison function for LSN waiters heaps. Waiting processes are ordered by
151 * LSN, so that the waiter with smallest LSN is at the top.
152 */
153static int
155{
158
159 if (aproc->waitLSN < bproc->waitLSN)
160 return 1;
161 else if (aproc->waitLSN > bproc->waitLSN)
162 return -1;
163 else
164 return 0;
165}
166
167/*
168 * Update minimum waited LSN for the specified LSN type
169 */
170static void
172{
174 int i = (int) lsnType;
175
176 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
177
179 {
181 WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
182
183 minWaitedLSN = procInfo->waitLSN;
184 }
186}
187
188/*
189 * Add current process to appropriate waiters heap based on LSN type
190 */
191static void
193{
195 int i = (int) lsnType;
196
197 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
198
199 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
200
201 procInfo->procno = MyProcNumber;
202 procInfo->waitLSN = lsn;
203 procInfo->lsnType = lsnType;
204
205 Assert(!procInfo->inHeap);
207 procInfo->inHeap = true;
208 updateMinWaitedLSN(lsnType);
209
210 LWLockRelease(WaitLSNLock);
211}
212
213/*
214 * Remove current process from appropriate waiters heap based on LSN type
215 */
216static void
218{
220 int i = (int) lsnType;
221
222 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
223
224 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
225
226 Assert(procInfo->lsnType == lsnType);
227
228 if (procInfo->inHeap)
229 {
231 procInfo->inHeap = false;
232 updateMinWaitedLSN(lsnType);
233 }
234
235 LWLockRelease(WaitLSNLock);
236}
237
238/*
239 * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
240 * on the stack. It should be enough to take single iteration for most cases.
241 */
242#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
243
244/*
245 * Remove waiters whose LSN has been reached from the heap and set their
246 * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
247 * and set latches for all waiters.
248 *
249 * This function first accumulates waiters to wake up into an array, then
250 * wakes them up without holding a WaitLSNLock. The array size is static and
251 * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
252 * to wake up all the waiters at once in the vast majority of cases. However,
253 * if there are more waiters, this function will loop to process them in
254 * multiple chunks.
255 */
256static void
258{
260 int numWakeUpProcs;
261 int i = (int) lsnType;
262
263 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
264
265 do
266 {
267 numWakeUpProcs = 0;
268 LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
269
270 /*
271 * Iterate the waiters heap until we find LSN not yet reached. Record
272 * process numbers to wake up, but send wakeups after releasing lock.
273 */
275 {
277 WaitLSNProcInfo *procInfo;
278
279 /* Get procInfo using appropriate heap node */
280 procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
281
282 if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
283 break;
284
285 Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
286 wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
288
289 /* Update appropriate flag */
290 procInfo->inHeap = false;
291
292 if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
293 break;
294 }
295
296 updateMinWaitedLSN(lsnType);
297 LWLockRelease(WaitLSNLock);
298
299 /*
300 * Set latches for processes whose waited LSNs have been reached.
301 * Since SetLatch() is a time-consuming operation, we do this outside
302 * of WaitLSNLock. This is safe because procLatch is never freed, so
303 * at worst we may set a latch for the wrong process or for no process
304 * at all, which is harmless.
305 */
306 for (i = 0; i < numWakeUpProcs; i++)
307 SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
308
309 } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
310}
311
312/*
313 * Wake up processes waiting for LSN to reach currentLSN
314 */
315void
317{
318 int i = (int) lsnType;
319
320 Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
321
322 /*
323 * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
324 * "wake all waiters" (e.g., during promotion when recovery ends).
325 */
326 if (XLogRecPtrIsValid(currentLSN) &&
328 return;
329
330 wakeupWaiters(lsnType, currentLSN);
331}
332
333/*
334 * Clean up LSN waiters for exiting process
335 */
336void
338{
339 if (waitLSNState)
340 {
341 /*
342 * We do a fast-path check of the inHeap flag without the lock. This
343 * flag is set to true only by the process itself. So, it's only
344 * possible to get a false positive. But that will be eliminated by a
345 * recheck inside deleteLSNWaiter().
346 */
349 }
350}
351
352/*
353 * Check if the given LSN type requires recovery to be in progress.
354 * Standby wait types (replay, write, flush) require recovery;
355 * primary wait types (flush) do not.
356 */
357static inline bool
359{
360 return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
363}
364
365/*
366 * Wait using MyLatch till the given LSN is reached, the replica gets
367 * promoted, or the postmaster dies.
368 *
369 * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
370 * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
371 * or replica got promoted before the target LSN reached.
372 */
374WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
375{
376 XLogRecPtr currentLSN;
377 TimestampTz endtime = 0;
378 int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
379
380 /* Shouldn't be called when shmem isn't initialized */
382
383 /* Should have a valid proc number */
385
386 if (timeout > 0)
387 {
389 wake_events |= WL_TIMEOUT;
390 }
391
392 /*
393 * Add our process to the waiters heap. It might happen that target LSN
394 * gets reached before we do. The check at the beginning of the loop
395 * below prevents the race condition.
396 */
397 addLSNWaiter(targetLSN, lsnType);
398
399 for (;;)
400 {
401 int rc;
402 long delay_ms = -1;
403
404 /* Get current LSN for the wait type */
405 currentLSN = GetCurrentLSNForWaitType(lsnType);
406
407 /* Check that recovery is still in-progress */
409 {
410 /*
411 * Recovery was ended, but check if target LSN was already
412 * reached.
413 */
414 deleteLSNWaiter(lsnType);
415
416 if (PromoteIsTriggered() && targetLSN <= currentLSN)
419 }
420 else
421 {
422 /* Check if the waited LSN has been reached */
423 if (targetLSN <= currentLSN)
424 break;
425 }
426
427 if (timeout > 0)
428 {
430 if (delay_ms <= 0)
431 break;
432 }
433
435
436 rc = WaitLatch(MyLatch, wake_events, delay_ms,
437 WaitLSNWaitEvents[lsnType]);
438
439 /*
440 * Emergency bailout if postmaster has died. This is to avoid the
441 * necessity for manual cleanup of all postmaster children.
442 */
443 if (rc & WL_POSTMASTER_DEATH)
445 errcode(ERRCODE_ADMIN_SHUTDOWN),
446 errmsg("terminating connection due to unexpected postmaster exit"),
447 errcontext("while waiting for LSN"));
448
449 if (rc & WL_LATCH_SET)
451 }
452
453 /*
454 * Delete our process from the shared memory heap. We might already be
455 * deleted by the startup process. The 'inHeap' flags prevents us from
456 * the double deletion.
457 */
458 deleteLSNWaiter(lsnType);
459
460 /*
461 * If we didn't reach the target LSN, we must be exited by timeout.
462 */
463 if (targetLSN > currentLSN)
465
467}
static void pg_atomic_write_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:485
static void pg_atomic_init_u64(volatile pg_atomic_uint64 *ptr, uint64 val)
Definition: atomics.h:453
static uint64 pg_atomic_read_u64(volatile pg_atomic_uint64 *ptr)
Definition: atomics.h:467
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1757
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
int64_t int64
Definition: c.h:549
#define pg_unreachable()
Definition: c.h:347
uint32_t uint32
Definition: c.h:552
#define lengthof(array)
Definition: c.h:801
#define PG_UINT64_MAX
Definition: c.h:612
size_t Size
Definition: c.h:624
int64 TimestampTz
Definition: timestamp.h:39
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define errcontext
Definition: elog.h:198
#define FATAL
Definition: elog.h:41
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
ProcNumber MyProcNumber
Definition: globals.c:90
int MaxBackends
Definition: globals.c:146
struct Latch * MyLatch
Definition: globals.c:63
Assert(PointerIsAligned(start, uint64))
int b
Definition: isn.c:74
int a
Definition: isn.c:73
int i
Definition: isn.c:77
void SetLatch(Latch *latch)
Definition: latch.c:290
void ResetLatch(Latch *latch)
Definition: latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1178
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1898
@ LW_EXCLUSIVE
Definition: lwlock.h:112
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
void pairingheap_remove(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:184
void pairingheap_add(pairingheap *heap, pairingheap_node *node)
Definition: pairingheap.c:126
void pairingheap_initialize(pairingheap *heap, pairingheap_comparator compare, void *arg)
Definition: pairingheap.c:60
pairingheap_node * pairingheap_remove_first(pairingheap *heap)
Definition: pairingheap.c:159
pairingheap_node * pairingheap_first(pairingheap *heap)
Definition: pairingheap.c:144
#define pairingheap_is_empty(h)
Definition: pairingheap.h:99
#define pairingheap_container(type, membername, ptr)
Definition: pairingheap.h:43
#define pairingheap_const_container(type, membername, ptr)
Definition: pairingheap.h:51
void * arg
#define NUM_AUXILIARY_PROCS
Definition: proc.h:463
#define GetPGProcByNumber(n)
Definition: proc.h:440
int ProcNumber
Definition: procnumber.h:24
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
ProcNumber procno
Definition: xlogwait.h:63
pairingheap_node heapNode
Definition: xlogwait.h:72
XLogRecPtr waitLSN
Definition: xlogwait.h:57
WaitLSNType lsnType
Definition: xlogwait.h:60
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]
Definition: xlogwait.h:97
pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]
Definition: xlogwait.h:85
pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]
Definition: xlogwait.h:91
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define WL_POSTMASTER_DEATH
Definition: waiteventset.h:38
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
XLogRecPtr GetWalRcvWriteRecPtr(void)
bool RecoveryInProgress(void)
Definition: xlog.c:6461
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6626
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
uint64 XLogRecPtr
Definition: xlogdefs.h:21
bool PromoteIsTriggered(void)
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)
void WaitLSNShmemInit(void)
Definition: xlogwait.c:125
static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
Definition: xlogwait.c:154
void WaitLSNCleanup(void)
Definition: xlogwait.c:337
#define WAKEUP_PROC_STATIC_ARRAY_SIZE
Definition: xlogwait.c:242
static bool WaitLSNTypeRequiresRecovery(WaitLSNType t)
Definition: xlogwait.c:358
struct WaitLSNState * waitLSNState
Definition: xlogwait.c:69
static void updateMinWaitedLSN(WaitLSNType lsnType)
Definition: xlogwait.c:171
static void deleteLSNWaiter(WaitLSNType lsnType)
Definition: xlogwait.c:217
XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType)
Definition: xlogwait.c:89
WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
Definition: xlogwait.c:374
StaticAssertDecl(lengthof(WaitLSNWaitEvents)==WAIT_LSN_TYPE_COUNT, "WaitLSNWaitEvents must match WaitLSNType enum")
static void wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition: xlogwait.c:257
static void addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
Definition: xlogwait.c:192
void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
Definition: xlogwait.c:316
Size WaitLSNShmemSize(void)
Definition: xlogwait.c:114
static const uint32 WaitLSNWaitEvents[]
Definition: xlogwait.c:75
#define WAIT_LSN_TYPE_COUNT
Definition: xlogwait.h:47
WaitLSNResult
Definition: xlogwait.h:26
@ WAIT_LSN_RESULT_NOT_IN_RECOVERY
Definition: xlogwait.h:28
@ WAIT_LSN_RESULT_TIMEOUT
Definition: xlogwait.h:30
@ WAIT_LSN_RESULT_SUCCESS
Definition: xlogwait.h:27
WaitLSNType
Definition: xlogwait.h:37
@ WAIT_LSN_TYPE_PRIMARY_FLUSH
Definition: xlogwait.h:44
@ WAIT_LSN_TYPE_STANDBY_REPLAY
Definition: xlogwait.h:39
@ WAIT_LSN_TYPE_STANDBY_FLUSH
Definition: xlogwait.h:41
@ WAIT_LSN_TYPE_STANDBY_WRITE
Definition: xlogwait.h:40