From 5035172e4ab58e4e8eef1bc60b0712fc59e0be31 Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Thu, 24 Oct 2024 14:37:53 +0300 Subject: Move LSN waiting declarations and definitions to better place 3c5db1d6b implemented the pg_wal_replay_wait() stored procedure. Due to the patch development history, the implementation resided in src/backend/commands/waitlsn.c (src/include/commands/waitlsn.h for headers). 014f9f34d moved pg_wal_replay_wait() itself to src/backend/access/transam/xlogfuncs.c near to the WAL-manipulation functions. But most of the implementation stayed in place. The code in src/backend/commands/waitlsn.c has nothing to do with commands, but is related to WAL. So, this commit moves this code into src/backend/access/transam/xlogwait.c (src/include/access/xlogwait.h for headers). Reported-by: Peter Eisentraut Discussion: https://postgr.es/m/18c0fa64-0475-415e-a1bd-665d922c5201%40eisentraut.org Reviewed-by: Pavel Borisov --- src/backend/access/transam/Makefile | 3 +- src/backend/access/transam/meson.build | 1 + src/backend/access/transam/xact.c | 2 +- src/backend/access/transam/xlog.c | 2 +- src/backend/access/transam/xlogfuncs.c | 2 +- src/backend/access/transam/xlogrecovery.c | 2 +- src/backend/access/transam/xlogwait.c | 338 ++++++++++++++++++++++++++++++ src/backend/commands/Makefile | 3 +- src/backend/commands/meson.build | 1 - src/backend/commands/waitlsn.c | 338 ------------------------------ src/backend/storage/ipc/ipci.c | 2 +- src/backend/storage/lmgr/proc.c | 2 +- src/include/access/xlogwait.h | 81 +++++++ src/include/commands/waitlsn.h | 81 ------- 14 files changed, 429 insertions(+), 429 deletions(-) create mode 100644 src/backend/access/transam/xlogwait.c delete mode 100644 src/backend/commands/waitlsn.c create mode 100644 src/include/access/xlogwait.h delete mode 100644 src/include/commands/waitlsn.h (limited to 'src') diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 661c55a9db7..a32f473e0a2 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -36,7 +36,8 @@ OBJS = \ xlogreader.o \ xlogrecovery.o \ xlogstats.o \ - xlogutils.o + xlogutils.o \ + xlogwait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build index 8a3522557cd..91d258f9df1 100644 --- a/src/backend/access/transam/meson.build +++ b/src/backend/access/transam/meson.build @@ -24,6 +24,7 @@ backend_sources += files( 'xlogrecovery.c', 'xlogstats.c', 'xlogutils.c', + 'xlogwait.c', ) # used by frontend programs to build a frontend xlogreader diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b0b05e28790..d8f6c658420 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -31,6 +31,7 @@ #include "access/xloginsert.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" @@ -38,7 +39,6 @@ #include "commands/async.h" #include "commands/tablecmds.h" #include "commands/trigger.h" -#include "commands/waitlsn.h" #include "common/pg_prng.h" #include "executor/spi.h" #include "libpq/be-fsstubs.h" diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9102c8d772e..ad9b0b612f4 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -62,11 +62,11 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" #include "catalog/pg_database.h" -#include "commands/waitlsn.h" #include "common/controldata_utils.h" #include "common/file_utils.h" #include "executor/instrument.h" diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index 3e3d2bb6189..cbf84ef7d8f 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -22,8 +22,8 @@ #include "access/xlog_internal.h" #include "access/xlogbackup.h" #include "access/xlogrecovery.h" +#include "access/xlogwait.h" #include "catalog/pg_type.h" -#include "commands/waitlsn.h" #include "funcapi.h" #include "miscadmin.h" #include "pgstat.h" diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 320b14add1a..31caa49d6c3 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -40,10 +40,10 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" -#include "commands/waitlsn.h" #include "common/file_utils.h" #include "miscadmin.h" #include "pgstat.h" diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c new file mode 100644 index 00000000000..eef58ce69ce --- /dev/null +++ b/src/backend/access/transam/xlogwait.c @@ -0,0 +1,338 @@ +/*------------------------------------------------------------------------- + * + * xlogwait.c + * Implements waiting for the given replay LSN, which is used in + * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/xlogwait.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include +#include + +#include "pgstat.h" +#include "access/xlog.h" +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "utils/fmgrprotos.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" +#include "utils/wait_event_types.h" + +static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); + +struct WaitLSNState *waitLSNState = NULL; + +/* Report the amount of shared memory space needed for WaitLSNState. */ +Size +WaitLSNShmemSize(void) +{ + Size size; + + size = offsetof(WaitLSNState, procInfos); + size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); + return size; +} + +/* Initialize the WaitLSNState in the shared memory. */ +void +WaitLSNShmemInit(void) +{ + bool found; + + waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState", + WaitLSNShmemSize(), + &found); + if (!found) + { + pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX); + pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL); + memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); + } +} + +/* + * Comparison function for waitLSN->waitersHeap heap. Waiting processes are + * ordered by lsn, so that the waiter with smallest lsn is at the top. + */ +static int +waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) +{ + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); + + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} + +/* + * Update waitLSN->minWaitedLSN according to the current state of + * waitLSN->waitersHeap. + */ +static void +updateMinWaitedLSN(void) +{ + XLogRecPtr minWaitedLSN = PG_UINT64_MAX; + + if (!pairingheap_is_empty(&waitLSNState->waitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); + + minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; + } + + pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN); +} + +/* + * Put the current process into the heap of LSN waiters. + */ +static void +addLSNWaiter(XLogRecPtr lsn) +{ + WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + Assert(!procInfo->inHeap); + + procInfo->latch = MyLatch; + procInfo->waitLSN = lsn; + + pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode); + procInfo->inHeap = true; + updateMinWaitedLSN(); + + LWLockRelease(WaitLSNLock); +} + +/* + * Remove the current process from the heap of LSN waiters if it's there. + */ +static void +deleteLSNWaiter(void) +{ + WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + if (!procInfo->inHeap) + { + LWLockRelease(WaitLSNLock); + return; + } + + pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode); + procInfo->inHeap = false; + updateMinWaitedLSN(); + + LWLockRelease(WaitLSNLock); +} + +/* + * Remove waiters whose LSN has been replayed from the heap and set their + * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap + * and set latches for all waiters. + */ +void +WaitLSNSetLatches(XLogRecPtr currentLSN) +{ + int i; + Latch **wakeUpProcLatches; + int numWakeUpProcs = 0; + + wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends); + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + /* + * Iterate the pairing heap of waiting processes till we find LSN not yet + * replayed. Record the process latches to set them later. + */ + while (!pairingheap_is_empty(&waitLSNState->waitersHeap)) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); + + if (!XLogRecPtrIsInvalid(currentLSN) && + procInfo->waitLSN > currentLSN) + break; + + wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch; + (void) pairingheap_remove_first(&waitLSNState->waitersHeap); + procInfo->inHeap = false; + } + + updateMinWaitedLSN(); + + LWLockRelease(WaitLSNLock); + + /* + * Set latches for processes, whose waited LSNs are already replayed. As + * the time consuming operations, we do it this outside of WaitLSNLock. + * This is actually fine because procLatch isn't ever freed, so we just + * can potentially set the wrong process' (or no process') latch. + */ + for (i = 0; i < numWakeUpProcs; i++) + { + SetLatch(wakeUpProcLatches[i]); + } + pfree(wakeUpProcLatches); +} + +/* + * Delete our item from shmem array if any. + */ +void +WaitLSNCleanup(void) +{ + /* + * We do a fast-path check of the 'inHeap' flag without the lock. This + * flag is set to true only by the process itself. So, it's only possible + * to get a false positive. But that will be eliminated by a recheck + * inside deleteLSNWaiter(). + */ + if (waitLSNState->procInfos[MyProcNumber].inHeap) + deleteLSNWaiter(); +} + +/* + * Wait using MyLatch till the given LSN is replayed, the postmaster dies or + * timeout happens. + */ +void +WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) +{ + XLogRecPtr currentLSN; + TimestampTz endtime = 0; + int wake_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSNState); + + /* Should have a valid proc number */ + Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends); + + if (!RecoveryInProgress()) + { + /* + * Recovery is not in progress. Given that we detected this in the + * very first check, this procedure was mistakenly called on primary. + * However, it's possible that standby was promoted concurrently to + * the procedure call, while target LSN is replayed. So, we still + * check the last replay LSN before reporting an error. + */ + if (targetLSN <= GetXLogReplayRecPtr(NULL)) + return; + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for LSN can only be executed during recovery."))); + } + else + { + /* If target LSN is already replayed, exit immediately */ + if (targetLSN <= GetXLogReplayRecPtr(NULL)) + return; + } + + if (timeout > 0) + { + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); + wake_events |= WL_TIMEOUT; + } + + /* + * Add our process to the pairing heap of waiters. It might happen that + * target LSN gets replayed before we do. Another check at the beginning + * of the loop below prevents the race condition. + */ + addLSNWaiter(targetLSN); + + for (;;) + { + int rc; + long delay_ms = 0; + + /* Recheck that recovery is still in-progress */ + if (!RecoveryInProgress()) + { + /* + * Recovery was ended, but recheck if target LSN was already + * replayed. + */ + currentLSN = GetXLogReplayRecPtr(NULL); + if (targetLSN <= currentLSN) + return; + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.", + LSN_FORMAT_ARGS(targetLSN), + LSN_FORMAT_ARGS(currentLSN)))); + } + else + { + /* Check if the waited LSN has been replayed */ + currentLSN = GetXLogReplayRecPtr(NULL); + if (targetLSN <= currentLSN) + break; + } + + /* + * If the timeout value is specified, calculate the number of + * milliseconds before the timeout. Exit if the timeout is already + * reached. + */ + if (timeout > 0) + { + delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime); + if (delay_ms <= 0) + break; + } + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, wake_events, delay_ms, + WAIT_EVENT_WAIT_FOR_WAL_REPLAY); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * Delete our process from the shared memory pairing heap. We might + * already be deleted by the startup process. The 'inHeap' flag prevents + * us from the double deletion. + */ + deleteLSNWaiter(); + + /* + * If we didn't reach the target LSN, we must be exited by timeout. + */ + if (targetLSN > currentLSN) + { + ereport(ERROR, + (errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", + LSN_FORMAT_ARGS(targetLSN), + LSN_FORMAT_ARGS(currentLSN)))); + } +} diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index cede90c3b98..48f7348f91c 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -61,7 +61,6 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o \ - waitlsn.o + view.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index 7549be5dc3b..6dd00a4abde 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -50,5 +50,4 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', - 'waitlsn.c', ) diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c deleted file mode 100644 index 501938f4330..00000000000 --- a/src/backend/commands/waitlsn.c +++ /dev/null @@ -1,338 +0,0 @@ -/*------------------------------------------------------------------------- - * - * waitlsn.c - * Implements waiting for the given replay LSN, which is used in - * CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). - * - * Copyright (c) 2024, PostgreSQL Global Development Group - * - * IDENTIFICATION - * src/backend/commands/waitlsn.c - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" - -#include -#include - -#include "pgstat.h" -#include "access/xlog.h" -#include "access/xlogrecovery.h" -#include "commands/waitlsn.h" -#include "funcapi.h" -#include "miscadmin.h" -#include "storage/latch.h" -#include "storage/proc.h" -#include "storage/shmem.h" -#include "utils/fmgrprotos.h" -#include "utils/pg_lsn.h" -#include "utils/snapmgr.h" -#include "utils/wait_event_types.h" - -static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, - void *arg); - -struct WaitLSNState *waitLSNState = NULL; - -/* Report the amount of shared memory space needed for WaitLSNState. */ -Size -WaitLSNShmemSize(void) -{ - Size size; - - size = offsetof(WaitLSNState, procInfos); - size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); - return size; -} - -/* Initialize the WaitLSNState in the shared memory. */ -void -WaitLSNShmemInit(void) -{ - bool found; - - waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState", - WaitLSNShmemSize(), - &found); - if (!found) - { - pg_atomic_init_u64(&waitLSNState->minWaitedLSN, PG_UINT64_MAX); - pairingheap_initialize(&waitLSNState->waitersHeap, waitlsn_cmp, NULL); - memset(&waitLSNState->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); - } -} - -/* - * Comparison function for waitLSN->waitersHeap heap. Waiting processes are - * ordered by lsn, so that the waiter with smallest lsn is at the top. - */ -static int -waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) -{ - const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); - const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); - - if (aproc->waitLSN < bproc->waitLSN) - return 1; - else if (aproc->waitLSN > bproc->waitLSN) - return -1; - else - return 0; -} - -/* - * Update waitLSN->minWaitedLSN according to the current state of - * waitLSN->waitersHeap. - */ -static void -updateMinWaitedLSN(void) -{ - XLogRecPtr minWaitedLSN = PG_UINT64_MAX; - - if (!pairingheap_is_empty(&waitLSNState->waitersHeap)) - { - pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); - - minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; - } - - pg_atomic_write_u64(&waitLSNState->minWaitedLSN, minWaitedLSN); -} - -/* - * Put the current process into the heap of LSN waiters. - */ -static void -addLSNWaiter(XLogRecPtr lsn) -{ - WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - Assert(!procInfo->inHeap); - - procInfo->latch = MyLatch; - procInfo->waitLSN = lsn; - - pairingheap_add(&waitLSNState->waitersHeap, &procInfo->phNode); - procInfo->inHeap = true; - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); -} - -/* - * Remove the current process from the heap of LSN waiters if it's there. - */ -static void -deleteLSNWaiter(void) -{ - WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - if (!procInfo->inHeap) - { - LWLockRelease(WaitLSNLock); - return; - } - - pairingheap_remove(&waitLSNState->waitersHeap, &procInfo->phNode); - procInfo->inHeap = false; - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); -} - -/* - * Remove waiters whose LSN has been replayed from the heap and set their - * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap - * and set latches for all waiters. - */ -void -WaitLSNSetLatches(XLogRecPtr currentLSN) -{ - int i; - Latch **wakeUpProcLatches; - int numWakeUpProcs = 0; - - wakeUpProcLatches = palloc(sizeof(Latch *) * MaxBackends); - - LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); - - /* - * Iterate the pairing heap of waiting processes till we find LSN not yet - * replayed. Record the process latches to set them later. - */ - while (!pairingheap_is_empty(&waitLSNState->waitersHeap)) - { - pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap); - WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); - - if (!XLogRecPtrIsInvalid(currentLSN) && - procInfo->waitLSN > currentLSN) - break; - - wakeUpProcLatches[numWakeUpProcs++] = procInfo->latch; - (void) pairingheap_remove_first(&waitLSNState->waitersHeap); - procInfo->inHeap = false; - } - - updateMinWaitedLSN(); - - LWLockRelease(WaitLSNLock); - - /* - * Set latches for processes, whose waited LSNs are already replayed. As - * the time consuming operations, we do it this outside of WaitLSNLock. - * This is actually fine because procLatch isn't ever freed, so we just - * can potentially set the wrong process' (or no process') latch. - */ - for (i = 0; i < numWakeUpProcs; i++) - { - SetLatch(wakeUpProcLatches[i]); - } - pfree(wakeUpProcLatches); -} - -/* - * Delete our item from shmem array if any. - */ -void -WaitLSNCleanup(void) -{ - /* - * We do a fast-path check of the 'inHeap' flag without the lock. This - * flag is set to true only by the process itself. So, it's only possible - * to get a false positive. But that will be eliminated by a recheck - * inside deleteLSNWaiter(). - */ - if (waitLSNState->procInfos[MyProcNumber].inHeap) - deleteLSNWaiter(); -} - -/* - * Wait using MyLatch till the given LSN is replayed, the postmaster dies or - * timeout happens. - */ -void -WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout) -{ - XLogRecPtr currentLSN; - TimestampTz endtime = 0; - int wake_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; - - /* Shouldn't be called when shmem isn't initialized */ - Assert(waitLSNState); - - /* Should have a valid proc number */ - Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends); - - if (!RecoveryInProgress()) - { - /* - * Recovery is not in progress. Given that we detected this in the - * very first check, this procedure was mistakenly called on primary. - * However, it's possible that standby was promoted concurrently to - * the procedure call, while target LSN is replayed. So, we still - * check the last replay LSN before reporting an error. - */ - if (targetLSN <= GetXLogReplayRecPtr(NULL)) - return; - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("recovery is not in progress"), - errhint("Waiting for LSN can only be executed during recovery."))); - } - else - { - /* If target LSN is already replayed, exit immediately */ - if (targetLSN <= GetXLogReplayRecPtr(NULL)) - return; - } - - if (timeout > 0) - { - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); - wake_events |= WL_TIMEOUT; - } - - /* - * Add our process to the pairing heap of waiters. It might happen that - * target LSN gets replayed before we do. Another check at the beginning - * of the loop below prevents the race condition. - */ - addLSNWaiter(targetLSN); - - for (;;) - { - int rc; - long delay_ms = 0; - - /* Recheck that recovery is still in-progress */ - if (!RecoveryInProgress()) - { - /* - * Recovery was ended, but recheck if target LSN was already - * replayed. - */ - currentLSN = GetXLogReplayRecPtr(NULL); - if (targetLSN <= currentLSN) - return; - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("recovery is not in progress"), - errdetail("Recovery ended before replaying target LSN %X/%X; last replay LSN %X/%X.", - LSN_FORMAT_ARGS(targetLSN), - LSN_FORMAT_ARGS(currentLSN)))); - } - else - { - /* Check if the waited LSN has been replayed */ - currentLSN = GetXLogReplayRecPtr(NULL); - if (targetLSN <= currentLSN) - break; - } - - /* - * If the timeout value is specified, calculate the number of - * milliseconds before the timeout. Exit if the timeout is already - * reached. - */ - if (timeout > 0) - { - delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime); - if (delay_ms <= 0) - break; - } - - CHECK_FOR_INTERRUPTS(); - - rc = WaitLatch(MyLatch, wake_events, delay_ms, - WAIT_EVENT_WAIT_FOR_WAL_REPLAY); - - if (rc & WL_LATCH_SET) - ResetLatch(MyLatch); - } - - /* - * Delete our process from the shared memory pairing heap. We might - * already be deleted by the startup process. The 'inHeap' flag prevents - * us from the double deletion. - */ - deleteLSNWaiter(); - - /* - * If we didn't reach the target LSN, we must be exited by timeout. - */ - if (targetLSN > currentLSN) - { - ereport(ERROR, - (errcode(ERRCODE_QUERY_CANCELED), - errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", - LSN_FORMAT_ARGS(targetLSN), - LSN_FORMAT_ARGS(currentLSN)))); - } -} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 10fc18f2529..9ff687045f4 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -24,8 +24,8 @@ #include "access/twophase.h" #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" +#include "access/xlogwait.h" #include "commands/async.h" -#include "commands/waitlsn.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index eaf3916f282..260e7029f50 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -36,7 +36,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" -#include "commands/waitlsn.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h new file mode 100644 index 00000000000..31e208cb7ad --- /dev/null +++ b/src/include/access/xlogwait.h @@ -0,0 +1,81 @@ +/*------------------------------------------------------------------------- + * + * xlogwait.h + * Declarations for LSN replay waiting routines. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * src/include/access/xlogwait.h + * + *------------------------------------------------------------------------- + */ +#ifndef XLOG_WAIT_H +#define XLOG_WAIT_H + +#include "lib/pairingheap.h" +#include "postgres.h" +#include "port/atomics.h" +#include "storage/latch.h" +#include "storage/spin.h" +#include "tcop/dest.h" + +/* + * WaitLSNProcInfo - the shared memory structure representing information + * about the single process, which may wait for LSN replay. An item of + * waitLSN->procInfos array. + */ +typedef struct WaitLSNProcInfo +{ + /* LSN, which this process is waiting for */ + XLogRecPtr waitLSN; + + /* + * A pointer to the latch, which should be set once the waitLSN is + * replayed. + */ + Latch *latch; + + /* A pairing heap node for participation in waitLSNState->waitersHeap */ + pairingheap_node phNode; + + /* + * A flag indicating that this item is present in + * waitLSNState->waitersHeap + */ + bool inHeap; +} WaitLSNProcInfo; + +/* + * WaitLSNState - the shared memory state for the replay LSN waiting facility. + */ +typedef struct WaitLSNState +{ + /* + * The minimum LSN value some process is waiting for. Used for the + * fast-path checking if we need to wake up any waiters after replaying a + * WAL record. Could be read lock-less. Update protected by WaitLSNLock. + */ + pg_atomic_uint64 minWaitedLSN; + + /* + * A pairing heap of waiting processes order by LSN values (least LSN is + * on top). Protected by WaitLSNLock. + */ + pairingheap waitersHeap; + + /* + * An array with per-process information, indexed by the process number. + * Protected by WaitLSNLock. + */ + WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; +} WaitLSNState; + +extern PGDLLIMPORT WaitLSNState *waitLSNState; + +extern Size WaitLSNShmemSize(void); +extern void WaitLSNShmemInit(void); +extern void WaitLSNSetLatches(XLogRecPtr currentLSN); +extern void WaitLSNCleanup(void); +extern void WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout); + +#endif /* XLOG_WAIT_H */ diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h deleted file mode 100644 index bb5ac858dcc..00000000000 --- a/src/include/commands/waitlsn.h +++ /dev/null @@ -1,81 +0,0 @@ -/*------------------------------------------------------------------------- - * - * waitlsn.h - * Declarations for LSN replay waiting routines. - * - * Copyright (c) 2024, PostgreSQL Global Development Group - * - * src/include/commands/waitlsn.h - * - *------------------------------------------------------------------------- - */ -#ifndef WAIT_LSN_H -#define WAIT_LSN_H - -#include "lib/pairingheap.h" -#include "postgres.h" -#include "port/atomics.h" -#include "storage/latch.h" -#include "storage/spin.h" -#include "tcop/dest.h" - -/* - * WaitLSNProcInfo - the shared memory structure representing information - * about the single process, which may wait for LSN replay. An item of - * waitLSN->procInfos array. - */ -typedef struct WaitLSNProcInfo -{ - /* LSN, which this process is waiting for */ - XLogRecPtr waitLSN; - - /* - * A pointer to the latch, which should be set once the waitLSN is - * replayed. - */ - Latch *latch; - - /* A pairing heap node for participation in waitLSNState->waitersHeap */ - pairingheap_node phNode; - - /* - * A flag indicating that this item is present in - * waitLSNState->waitersHeap - */ - bool inHeap; -} WaitLSNProcInfo; - -/* - * WaitLSNState - the shared memory state for the replay LSN waiting facility. - */ -typedef struct WaitLSNState -{ - /* - * The minimum LSN value some process is waiting for. Used for the - * fast-path checking if we need to wake up any waiters after replaying a - * WAL record. Could be read lock-less. Update protected by WaitLSNLock. - */ - pg_atomic_uint64 minWaitedLSN; - - /* - * A pairing heap of waiting processes order by LSN values (least LSN is - * on top). Protected by WaitLSNLock. - */ - pairingheap waitersHeap; - - /* - * An array with per-process information, indexed by the process number. - * Protected by WaitLSNLock. - */ - WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; -} WaitLSNState; - -extern PGDLLIMPORT WaitLSNState *waitLSNState; - -extern Size WaitLSNShmemSize(void); -extern void WaitLSNShmemInit(void); -extern void WaitLSNSetLatches(XLogRecPtr currentLSN); -extern void WaitLSNCleanup(void); -extern void WaitForLSNReplay(XLogRecPtr targetLSN, int64 timeout); - -#endif /* WAIT_LSN_H */ -- cgit v1.2.3