PostgreSQL Source Code git master
slot.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * slot.c
4 * Replication slot management.
5 *
6 *
7 * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/replication/slot.c
12 *
13 * NOTES
14 *
15 * Replication slots are used to keep state about replication streams
16 * originating from this cluster. Their primary purpose is to prevent the
17 * premature removal of WAL or of old tuple versions in a manner that would
18 * interfere with replication; they are also useful for monitoring purposes.
19 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 * on standbys (to support cascading setups). The requirement that slots be
21 * usable on standbys precludes storing them in the system catalogs.
22 *
23 * Each replication slot gets its own directory inside the directory
24 * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 * contain the slot's own data. Additional data can be stored alongside that
26 * file if required. While the server is running, the state data is also
27 * cached in memory for efficiency.
28 *
29 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 * to iterate over the slots, and in exclusive mode to change the in_use flag
32 * of a slot. The remaining data in each slot is protected by its mutex.
33 *
34 *-------------------------------------------------------------------------
35 */
36
37#include "postgres.h"
38
39#include <unistd.h>
40#include <sys/stat.h>
41
42#include "access/transam.h"
44#include "access/xlogrecovery.h"
45#include "common/file_utils.h"
46#include "common/string.h"
47#include "miscadmin.h"
48#include "pgstat.h"
52#include "replication/slot.h"
54#include "storage/fd.h"
55#include "storage/ipc.h"
56#include "storage/proc.h"
57#include "storage/procarray.h"
58#include "utils/builtins.h"
59#include "utils/guc_hooks.h"
61#include "utils/varlena.h"
62
63/*
64 * Replication slot on-disk data structure.
65 */
67{
68 /* first part of this struct needs to be version independent */
69
70 /* data not covered by checksum */
73
74 /* data covered by checksum */
77
78 /*
79 * The actual data in the slot that follows can differ based on the above
80 * 'version'.
81 */
82
85
86/*
87 * Struct for the configuration of synchronized_standby_slots.
88 *
89 * Note: this must be a flat representation that can be held in a single chunk
90 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 * synchronized_standby_slots GUC.
92 */
93typedef struct
94{
95 /* Number of slot names in the slot_names[] */
97
98 /*
99 * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 */
101 char slot_names[FLEXIBLE_ARRAY_MEMBER];
103
104/*
105 * Lookup table for slot invalidation causes.
106 */
108{
110 const char *cause_name;
112
114 {RS_INVAL_NONE, "none"},
115 {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 {RS_INVAL_HORIZON, "rows_removed"},
117 {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119};
120
121/*
122 * Ensure that the lookup table is up-to-date with the enums defined in
123 * ReplicationSlotInvalidationCause.
124 */
126 "array length mismatch");
127
128/* size of version independent data */
129#define ReplicationSlotOnDiskConstantSize \
130 offsetof(ReplicationSlotOnDisk, slotdata)
131/* size of the part of the slot not covered by the checksum */
132#define ReplicationSlotOnDiskNotChecksummedSize \
133 offsetof(ReplicationSlotOnDisk, version)
134/* size of the part covered by the checksum */
135#define ReplicationSlotOnDiskChecksummedSize \
136 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137/* size of the slot data that is version dependent */
138#define ReplicationSlotOnDiskV2Size \
139 sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140
141#define SLOT_MAGIC 0x1051CA1 /* format identifier */
142#define SLOT_VERSION 5 /* version for new files */
143
144/* Control array for replication slot management */
146
147/* My backend's replication slot in the shared memory array */
149
150/* GUC variables */
151int max_replication_slots = 10; /* the maximum number of replication
152 * slots */
153
154/*
155 * Invalidate replication slots that have remained idle longer than this
156 * duration; '0' disables it.
157 */
159
160/*
161 * This GUC lists streaming replication standby server slot names that
162 * logical WAL sender processes will wait for.
163 */
165
166/* This is the parsed and cached configuration for synchronized_standby_slots */
168
169/*
170 * Oldest LSN that has been confirmed to be flushed to the standbys
171 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 */
174
175static void ReplicationSlotShmemExit(int code, Datum arg);
176static bool IsSlotForConflictCheck(const char *name);
177static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178
179/* internal persistency functions */
180static void RestoreSlotFromDisk(const char *name);
181static void CreateSlotOnDisk(ReplicationSlot *slot);
182static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183
184/*
185 * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 */
187Size
189{
190 Size size = 0;
191
192 if (max_replication_slots == 0)
193 return size;
194
195 size = offsetof(ReplicationSlotCtlData, replication_slots);
196 size = add_size(size,
198
199 return size;
200}
201
202/*
203 * Allocate and initialize shared memory for replication slots.
204 */
205void
207{
208 bool found;
209
210 if (max_replication_slots == 0)
211 return;
212
214 ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 &found);
216
217 if (!found)
218 {
219 int i;
220
221 /* First time through, so initialize */
223
224 for (i = 0; i < max_replication_slots; i++)
225 {
227
228 /* everything else is zeroed by the memset above */
229 SpinLockInit(&slot->mutex);
231 LWTRANCHE_REPLICATION_SLOT_IO);
233 }
234 }
235}
236
237/*
238 * Register the callback for replication slot cleanup and releasing.
239 */
240void
242{
244}
245
246/*
247 * Release and cleanup replication slots.
248 */
249static void
251{
252 /* Make sure active replication slots are released */
253 if (MyReplicationSlot != NULL)
255
256 /* Also cleanup all the temporary slots. */
258}
259
260/*
261 * Check whether the passed slot name is valid and report errors at elevel.
262 *
263 * See comments for ReplicationSlotValidateNameInternal().
264 */
265bool
266ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
267 int elevel)
268{
269 int err_code;
270 char *err_msg = NULL;
271 char *err_hint = NULL;
272
273 if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 &err_code, &err_msg, &err_hint))
275 {
276 /*
277 * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 * and errhint(), since the messages from
279 * ReplicationSlotValidateNameInternal() are already translated. This
280 * avoids double translation.
281 */
282 ereport(elevel,
283 errcode(err_code),
284 errmsg_internal("%s", err_msg),
285 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286
287 pfree(err_msg);
288 if (err_hint != NULL)
289 pfree(err_hint);
290 return false;
291 }
292
293 return true;
294}
295
296/*
297 * Check whether the passed slot name is valid.
298 *
299 * An error will be reported for a reserved replication slot name if
300 * allow_reserved_name is set to false.
301 *
302 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
303 * the name to be used as a directory name on every supported OS.
304 *
305 * Returns true if the slot name is valid. Otherwise, returns false and stores
306 * the error code, error message, and optional hint in err_code, err_msg, and
307 * err_hint, respectively. The caller is responsible for freeing err_msg and
308 * err_hint, which are palloc'd.
309 */
310bool
311ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
312 int *err_code, char **err_msg, char **err_hint)
313{
314 const char *cp;
315
316 if (strlen(name) == 0)
317 {
318 *err_code = ERRCODE_INVALID_NAME;
319 *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 *err_hint = NULL;
321 return false;
322 }
323
324 if (strlen(name) >= NAMEDATALEN)
325 {
326 *err_code = ERRCODE_NAME_TOO_LONG;
327 *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 *err_hint = NULL;
329 return false;
330 }
331
332 for (cp = name; *cp; cp++)
333 {
334 if (!((*cp >= 'a' && *cp <= 'z')
335 || (*cp >= '0' && *cp <= '9')
336 || (*cp == '_')))
337 {
338 *err_code = ERRCODE_INVALID_NAME;
339 *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 return false;
342 }
343 }
344
345 if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 {
347 *err_code = ERRCODE_RESERVED_NAME;
348 *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
351 return false;
352 }
353
354 return true;
355}
356
357/*
358 * Return true if the replication slot name is "pg_conflict_detection".
359 */
360static bool
362{
363 return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364}
365
366/*
367 * Create a new replication slot and mark it as used by this backend.
368 *
369 * name: Name of the slot
370 * db_specific: logical decoding is db specific; if the slot is going to
371 * be used for that pass true, otherwise false.
372 * two_phase: Allows decoding of prepared transactions. We allow this option
373 * to be enabled only at the slot creation time. If we allow this option
374 * to be changed during decoding then it is quite possible that we skip
375 * prepare first time because this option was not enabled. Now next time
376 * during getting changes, if the two_phase option is enabled it can skip
377 * prepare because by that time start decoding point has been moved. So the
378 * user will only get commit prepared.
379 * failover: If enabled, allows the slot to be synced to standbys so
380 * that logical replication can be resumed after failover.
381 * synced: True if the slot is synchronized from the primary server.
382 */
383void
384ReplicationSlotCreate(const char *name, bool db_specific,
385 ReplicationSlotPersistency persistency,
386 bool two_phase, bool failover, bool synced)
387{
388 ReplicationSlot *slot = NULL;
389 int i;
390
391 Assert(MyReplicationSlot == NULL);
392
393 /*
394 * The logical launcher or pg_upgrade may create or migrate an internal
395 * slot, so using a reserved name is allowed in these cases.
396 */
398 ERROR);
399
400 if (failover)
401 {
402 /*
403 * Do not allow users to create the failover enabled slots on the
404 * standby as we do not support sync to the cascading standby.
405 *
406 * However, failover enabled slots can be created during slot
407 * synchronization because we need to retain the same values as the
408 * remote slot.
409 */
412 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 errmsg("cannot enable failover for a replication slot created on the standby"));
414
415 /*
416 * Do not allow users to create failover enabled temporary slots,
417 * because temporary slots will not be synced to the standby.
418 *
419 * However, failover enabled temporary slots can be created during
420 * slot synchronization. See the comments atop slotsync.c for details.
421 */
422 if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
424 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425 errmsg("cannot enable failover for a temporary replication slot"));
426 }
427
428 /*
429 * If some other backend ran this code concurrently with us, we'd likely
430 * both allocate the same slot, and that would be bad. We'd also be at
431 * risk of missing a name collision. Also, we don't want to try to create
432 * a new slot while somebody's busy cleaning up an old one, because we
433 * might both be monkeying with the same directory.
434 */
435 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
436
437 /*
438 * Check for name collision, and identify an allocatable slot. We need to
439 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
440 * else can change the in_use flags while we're looking at them.
441 */
442 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
443 for (i = 0; i < max_replication_slots; i++)
444 {
446
447 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
450 errmsg("replication slot \"%s\" already exists", name)));
451 if (!s->in_use && slot == NULL)
452 slot = s;
453 }
454 LWLockRelease(ReplicationSlotControlLock);
455
456 /* If all slots are in use, we're out of luck. */
457 if (slot == NULL)
459 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
460 errmsg("all replication slots are in use"),
461 errhint("Free one or increase \"max_replication_slots\".")));
462
463 /*
464 * Since this slot is not in use, nobody should be looking at any part of
465 * it other than the in_use field unless they're trying to allocate it.
466 * And since we hold ReplicationSlotAllocationLock, nobody except us can
467 * be doing that. So it's safe to initialize the slot.
468 */
469 Assert(!slot->in_use);
470 Assert(slot->active_pid == 0);
471
472 /* first initialize persistent data */
473 memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
474 namestrcpy(&slot->data.name, name);
475 slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
476 slot->data.persistency = persistency;
477 slot->data.two_phase = two_phase;
479 slot->data.failover = failover;
480 slot->data.synced = synced;
481
482 /* and then data only present in shared memory */
483 slot->just_dirtied = false;
484 slot->dirty = false;
493 slot->inactive_since = 0;
495
496 /*
497 * Create the slot on disk. We haven't actually marked the slot allocated
498 * yet, so no special cleanup is required if this errors out.
499 */
500 CreateSlotOnDisk(slot);
501
502 /*
503 * We need to briefly prevent any other backend from iterating over the
504 * slots while we flip the in_use flag. We also need to set the active
505 * flag while holding the ControlLock as otherwise a concurrent
506 * ReplicationSlotAcquire() could acquire the slot as well.
507 */
508 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
509
510 slot->in_use = true;
511
512 /* We can now mark the slot active, and that makes it our slot. */
513 SpinLockAcquire(&slot->mutex);
514 Assert(slot->active_pid == 0);
515 slot->active_pid = MyProcPid;
516 SpinLockRelease(&slot->mutex);
517 MyReplicationSlot = slot;
518
519 LWLockRelease(ReplicationSlotControlLock);
520
521 /*
522 * Create statistics entry for the new logical slot. We don't collect any
523 * stats for physical slots, so no need to create an entry for the same.
524 * See ReplicationSlotDropPtr for why we need to do this before releasing
525 * ReplicationSlotAllocationLock.
526 */
527 if (SlotIsLogical(slot))
529
530 /*
531 * Now that the slot has been marked as in_use and active, it's safe to
532 * let somebody else try to allocate a slot.
533 */
534 LWLockRelease(ReplicationSlotAllocationLock);
535
536 /* Let everybody know we've modified this slot */
538}
539
540/*
541 * Search for the named replication slot.
542 *
543 * Return the replication slot if found, otherwise NULL.
544 */
546SearchNamedReplicationSlot(const char *name, bool need_lock)
547{
548 int i;
549 ReplicationSlot *slot = NULL;
550
551 if (need_lock)
552 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
553
554 for (i = 0; i < max_replication_slots; i++)
555 {
557
558 if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
559 {
560 slot = s;
561 break;
562 }
563 }
564
565 if (need_lock)
566 LWLockRelease(ReplicationSlotControlLock);
567
568 return slot;
569}
570
571/*
572 * Return the index of the replication slot in
573 * ReplicationSlotCtl->replication_slots.
574 *
575 * This is mainly useful to have an efficient key for storing replication slot
576 * stats.
577 */
578int
580{
582 slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
583
585}
586
587/*
588 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
589 * the slot's name and true is returned.
590 *
591 * This likely is only useful for pgstat_replslot.c during shutdown, in other
592 * cases there are obvious TOCTOU issues.
593 */
594bool
596{
597 ReplicationSlot *slot;
598 bool found;
599
601
602 /*
603 * Ensure that the slot cannot be dropped while we copy the name. Don't
604 * need the spinlock as the name of an existing slot cannot change.
605 */
606 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
607 found = slot->in_use;
608 if (slot->in_use)
610 LWLockRelease(ReplicationSlotControlLock);
611
612 return found;
613}
614
615/*
616 * Find a previously created slot and mark it as used by this process.
617 *
618 * An error is raised if nowait is true and the slot is currently in use. If
619 * nowait is false, we sleep until the slot is released by the owning process.
620 *
621 * An error is raised if error_if_invalid is true and the slot is found to
622 * be invalid. It should always be set to true, except when we are temporarily
623 * acquiring the slot and don't intend to change it.
624 */
625void
626ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
627{
629 int active_pid;
630
631 Assert(name != NULL);
632
633retry:
634 Assert(MyReplicationSlot == NULL);
635
636 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
637
638 /* Check if the slot exits with the given name. */
640 if (s == NULL || !s->in_use)
641 {
642 LWLockRelease(ReplicationSlotControlLock);
643
645 (errcode(ERRCODE_UNDEFINED_OBJECT),
646 errmsg("replication slot \"%s\" does not exist",
647 name)));
648 }
649
650 /*
651 * Do not allow users to acquire the reserved slot. This scenario may
652 * occur if the launcher that owns the slot has terminated unexpectedly
653 * due to an error, and a backend process attempts to reuse the slot.
654 */
657 errcode(ERRCODE_UNDEFINED_OBJECT),
658 errmsg("cannot acquire replication slot \"%s\"", name),
659 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
660
661 /*
662 * This is the slot we want; check if it's active under some other
663 * process. In single user mode, we don't need this check.
664 */
666 {
667 /*
668 * Get ready to sleep on the slot in case it is active. (We may end
669 * up not sleeping, but we don't want to do this while holding the
670 * spinlock.)
671 */
672 if (!nowait)
674
675 /*
676 * It is important to reset the inactive_since under spinlock here to
677 * avoid race conditions with slot invalidation. See comments related
678 * to inactive_since in InvalidatePossiblyObsoleteSlot.
679 */
681 if (s->active_pid == 0)
683 active_pid = s->active_pid;
686 }
687 else
688 {
689 s->active_pid = active_pid = MyProcPid;
691 }
692 LWLockRelease(ReplicationSlotControlLock);
693
694 /*
695 * If we found the slot but it's already active in another process, we
696 * wait until the owning process signals us that it's been released, or
697 * error out.
698 */
699 if (active_pid != MyProcPid)
700 {
701 if (!nowait)
702 {
703 /* Wait here until we get signaled, and then restart */
705 WAIT_EVENT_REPLICATION_SLOT_DROP);
707 goto retry;
708 }
709
711 (errcode(ERRCODE_OBJECT_IN_USE),
712 errmsg("replication slot \"%s\" is active for PID %d",
713 NameStr(s->data.name), active_pid)));
714 }
715 else if (!nowait)
716 ConditionVariableCancelSleep(); /* no sleep needed after all */
717
718 /* We made this slot active, so it's ours now. */
720
721 /*
722 * We need to check for invalidation after making the slot ours to avoid
723 * the possible race condition with the checkpointer that can otherwise
724 * invalidate the slot immediately after the check.
725 */
726 if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
728 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729 errmsg("can no longer access replication slot \"%s\"",
730 NameStr(s->data.name)),
731 errdetail("This replication slot has been invalidated due to \"%s\".",
733
734 /* Let everybody know we've modified this slot */
736
737 /*
738 * The call to pgstat_acquire_replslot() protects against stats for a
739 * different slot, from before a restart or such, being present during
740 * pgstat_report_replslot().
741 */
742 if (SlotIsLogical(s))
744
745
746 if (am_walsender)
747 {
750 ? errmsg("acquired logical replication slot \"%s\"",
751 NameStr(s->data.name))
752 : errmsg("acquired physical replication slot \"%s\"",
753 NameStr(s->data.name)));
754 }
755}
756
757/*
758 * Release the replication slot that this backend considers to own.
759 *
760 * This or another backend can re-acquire the slot later.
761 * Resources this slot requires will be preserved.
762 */
763void
765{
767 char *slotname = NULL; /* keep compiler quiet */
768 bool is_logical;
769 TimestampTz now = 0;
770
771 Assert(slot != NULL && slot->active_pid != 0);
772
773 is_logical = SlotIsLogical(slot);
774
775 if (am_walsender)
776 slotname = pstrdup(NameStr(slot->data.name));
777
778 if (slot->data.persistency == RS_EPHEMERAL)
779 {
780 /*
781 * Delete the slot. There is no !PANIC case where this is allowed to
782 * fail, all that may happen is an incomplete cleanup of the on-disk
783 * data.
784 */
786
787 /*
788 * Request to disable logical decoding, even though this slot may not
789 * have been the last logical slot. The checkpointer will verify if
790 * logical decoding should actually be disabled.
791 */
792 if (is_logical)
794 }
795
796 /*
797 * If slot needed to temporarily restrain both data and catalog xmin to
798 * create the catalog snapshot, remove that temporary constraint.
799 * Snapshots can only be exported while the initial snapshot is still
800 * acquired.
801 */
802 if (!TransactionIdIsValid(slot->data.xmin) &&
804 {
805 SpinLockAcquire(&slot->mutex);
807 SpinLockRelease(&slot->mutex);
809 }
810
811 /*
812 * Set the time since the slot has become inactive. We get the current
813 * time beforehand to avoid system call while holding the spinlock.
814 */
816
817 if (slot->data.persistency == RS_PERSISTENT)
818 {
819 /*
820 * Mark persistent slot inactive. We're not freeing it, just
821 * disconnecting, but wake up others that may be waiting for it.
822 */
823 SpinLockAcquire(&slot->mutex);
824 slot->active_pid = 0;
826 SpinLockRelease(&slot->mutex);
828 }
829 else
831
832 MyReplicationSlot = NULL;
833
834 /* might not have been set when we've been a plain slot */
835 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
836 MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
838 LWLockRelease(ProcArrayLock);
839
840 if (am_walsender)
841 {
843 is_logical
844 ? errmsg("released logical replication slot \"%s\"",
845 slotname)
846 : errmsg("released physical replication slot \"%s\"",
847 slotname));
848
849 pfree(slotname);
850 }
851}
852
853/*
854 * Cleanup temporary slots created in current session.
855 *
856 * Cleanup only synced temporary slots if 'synced_only' is true, else
857 * cleanup all temporary slots.
858 *
859 * If it drops the last logical slot in the cluster, requests to disable
860 * logical decoding.
861 */
862void
863ReplicationSlotCleanup(bool synced_only)
864{
865 int i;
866 bool found_valid_logicalslot;
867 bool dropped_logical = false;
868
869 Assert(MyReplicationSlot == NULL);
870
871restart:
872 found_valid_logicalslot = false;
873 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
874 for (i = 0; i < max_replication_slots; i++)
875 {
877
878 if (!s->in_use)
879 continue;
880
882
883 found_valid_logicalslot |=
885
886 if ((s->active_pid == MyProcPid &&
887 (!synced_only || s->data.synced)))
888 {
891 LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
892
893 if (SlotIsLogical(s))
894 dropped_logical = true;
895
897
899 goto restart;
900 }
901 else
903 }
904
905 LWLockRelease(ReplicationSlotControlLock);
906
907 if (dropped_logical && !found_valid_logicalslot)
909}
910
911/*
912 * Permanently drop replication slot identified by the passed in name.
913 */
914void
915ReplicationSlotDrop(const char *name, bool nowait)
916{
917 bool is_logical;
918
919 Assert(MyReplicationSlot == NULL);
920
921 ReplicationSlotAcquire(name, nowait, false);
922
923 /*
924 * Do not allow users to drop the slots which are currently being synced
925 * from the primary to the standby.
926 */
929 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
930 errmsg("cannot drop replication slot \"%s\"", name),
931 errdetail("This replication slot is being synchronized from the primary server."));
932
933 is_logical = SlotIsLogical(MyReplicationSlot);
934
936
937 if (is_logical)
939}
940
941/*
942 * Change the definition of the slot identified by the specified name.
943 */
944void
945ReplicationSlotAlter(const char *name, const bool *failover,
946 const bool *two_phase)
947{
948 bool update_slot = false;
949
950 Assert(MyReplicationSlot == NULL);
952
953 ReplicationSlotAcquire(name, false, true);
954
957 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
958 errmsg("cannot use %s with a physical replication slot",
959 "ALTER_REPLICATION_SLOT"));
960
961 if (RecoveryInProgress())
962 {
963 /*
964 * Do not allow users to alter the slots which are currently being
965 * synced from the primary to the standby.
966 */
969 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
970 errmsg("cannot alter replication slot \"%s\"", name),
971 errdetail("This replication slot is being synchronized from the primary server."));
972
973 /*
974 * Do not allow users to enable failover on the standby as we do not
975 * support sync to the cascading standby.
976 */
977 if (failover && *failover)
979 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
980 errmsg("cannot enable failover for a replication slot"
981 " on the standby"));
982 }
983
984 if (failover)
985 {
986 /*
987 * Do not allow users to enable failover for temporary slots as we do
988 * not support syncing temporary slots to the standby.
989 */
992 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
993 errmsg("cannot enable failover for a temporary replication slot"));
994
996 {
1000
1001 update_slot = true;
1002 }
1003 }
1004
1006 {
1010
1011 update_slot = true;
1012 }
1013
1014 if (update_slot)
1015 {
1018 }
1019
1021}
1022
1023/*
1024 * Permanently drop the currently acquired replication slot.
1025 */
1026void
1028{
1030
1031 Assert(MyReplicationSlot != NULL);
1032
1033 /* slot isn't acquired anymore */
1034 MyReplicationSlot = NULL;
1035
1037}
1038
1039/*
1040 * Permanently drop the replication slot which will be released by the point
1041 * this function returns.
1042 */
1043static void
1045{
1046 char path[MAXPGPATH];
1047 char tmppath[MAXPGPATH];
1048
1049 /*
1050 * If some other backend ran this code concurrently with us, we might try
1051 * to delete a slot with a certain name while someone else was trying to
1052 * create a slot with the same name.
1053 */
1054 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1055
1056 /* Generate pathnames. */
1057 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1058 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1059
1060 /*
1061 * Rename the slot directory on disk, so that we'll no longer recognize
1062 * this as a valid slot. Note that if this fails, we've got to mark the
1063 * slot inactive before bailing out. If we're dropping an ephemeral or a
1064 * temporary slot, we better never fail hard as the caller won't expect
1065 * the slot to survive and this might get called during error handling.
1066 */
1067 if (rename(path, tmppath) == 0)
1068 {
1069 /*
1070 * We need to fsync() the directory we just renamed and its parent to
1071 * make sure that our changes are on disk in a crash-safe fashion. If
1072 * fsync() fails, we can't be sure whether the changes are on disk or
1073 * not. For now, we handle that by panicking;
1074 * StartupReplicationSlots() will try to straighten it out after
1075 * restart.
1076 */
1078 fsync_fname(tmppath, true);
1081 }
1082 else
1083 {
1084 bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1085
1086 SpinLockAcquire(&slot->mutex);
1087 slot->active_pid = 0;
1088 SpinLockRelease(&slot->mutex);
1089
1090 /* wake up anyone waiting on this slot */
1092
1093 ereport(fail_softly ? WARNING : ERROR,
1095 errmsg("could not rename file \"%s\" to \"%s\": %m",
1096 path, tmppath)));
1097 }
1098
1099 /*
1100 * The slot is definitely gone. Lock out concurrent scans of the array
1101 * long enough to kill it. It's OK to clear the active PID here without
1102 * grabbing the mutex because nobody else can be scanning the array here,
1103 * and nobody can be attached to this slot and thus access it without
1104 * scanning the array.
1105 *
1106 * Also wake up processes waiting for it.
1107 */
1108 LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1109 slot->active_pid = 0;
1110 slot->in_use = false;
1111 LWLockRelease(ReplicationSlotControlLock);
1113
1114 /*
1115 * Slot is dead and doesn't prevent resource removal anymore, recompute
1116 * limits.
1117 */
1120
1121 /*
1122 * If removing the directory fails, the worst thing that will happen is
1123 * that the user won't be able to create a new slot with the same name
1124 * until the next server restart. We warn about it, but that's all.
1125 */
1126 if (!rmtree(tmppath, true))
1128 (errmsg("could not remove directory \"%s\"", tmppath)));
1129
1130 /*
1131 * Drop the statistics entry for the replication slot. Do this while
1132 * holding ReplicationSlotAllocationLock so that we don't drop a
1133 * statistics entry for another slot with the same name just created in
1134 * another session.
1135 */
1136 if (SlotIsLogical(slot))
1138
1139 /*
1140 * We release this at the very end, so that nobody starts trying to create
1141 * a slot while we're still cleaning up the detritus of the old one.
1142 */
1143 LWLockRelease(ReplicationSlotAllocationLock);
1144}
1145
1146/*
1147 * Serialize the currently acquired slot's state from memory to disk, thereby
1148 * guaranteeing the current state will survive a crash.
1149 */
1150void
1152{
1153 char path[MAXPGPATH];
1154
1155 Assert(MyReplicationSlot != NULL);
1156
1159}
1160
1161/*
1162 * Signal that it would be useful if the currently acquired slot would be
1163 * flushed out to disk.
1164 *
1165 * Note that the actual flush to disk can be delayed for a long time, if
1166 * required for correctness explicitly do a ReplicationSlotSave().
1167 */
1168void
1170{
1172
1173 Assert(MyReplicationSlot != NULL);
1174
1175 SpinLockAcquire(&slot->mutex);
1177 MyReplicationSlot->dirty = true;
1178 SpinLockRelease(&slot->mutex);
1179}
1180
1181/*
1182 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1183 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1184 */
1185void
1187{
1189
1190 Assert(slot != NULL);
1192
1193 SpinLockAcquire(&slot->mutex);
1195 SpinLockRelease(&slot->mutex);
1196
1199}
1200
1201/*
1202 * Compute the oldest xmin across all slots and store it in the ProcArray.
1203 *
1204 * If already_locked is true, both the ReplicationSlotControlLock and the
1205 * ProcArrayLock have already been acquired exclusively. It is crucial that the
1206 * caller first acquires the ReplicationSlotControlLock, followed by the
1207 * ProcArrayLock, to prevent any undetectable deadlocks since this function
1208 * acquires them in that order.
1209 */
1210void
1212{
1213 int i;
1215 TransactionId agg_catalog_xmin = InvalidTransactionId;
1216
1217 Assert(ReplicationSlotCtl != NULL);
1218 Assert(!already_locked ||
1219 (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1220 LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1221
1222 /*
1223 * Hold the ReplicationSlotControlLock until after updating the slot xmin
1224 * values, so no backend updates the initial xmin for newly created slot
1225 * concurrently. A shared lock is used here to minimize lock contention,
1226 * especially when many slots exist and advancements occur frequently.
1227 * This is safe since an exclusive lock is taken during initial slot xmin
1228 * update in slot creation.
1229 *
1230 * One might think that we can hold the ProcArrayLock exclusively and
1231 * update the slot xmin values, but it could increase lock contention on
1232 * the ProcArrayLock, which is not great since this function can be called
1233 * at non-negligible frequency.
1234 *
1235 * Concurrent invocation of this function may cause the computed slot xmin
1236 * to regress. However, this is harmless because tuples prior to the most
1237 * recent xmin are no longer useful once advancement occurs (see
1238 * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1239 * before updating the effective_xmin). Thus, such regression merely
1240 * prevents VACUUM from prematurely removing tuples without causing the
1241 * early deletion of required data.
1242 */
1243 if (!already_locked)
1244 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1245
1246 for (i = 0; i < max_replication_slots; i++)
1247 {
1249 TransactionId effective_xmin;
1250 TransactionId effective_catalog_xmin;
1251 bool invalidated;
1252
1253 if (!s->in_use)
1254 continue;
1255
1257 effective_xmin = s->effective_xmin;
1258 effective_catalog_xmin = s->effective_catalog_xmin;
1259 invalidated = s->data.invalidated != RS_INVAL_NONE;
1261
1262 /* invalidated slots need not apply */
1263 if (invalidated)
1264 continue;
1265
1266 /* check the data xmin */
1267 if (TransactionIdIsValid(effective_xmin) &&
1268 (!TransactionIdIsValid(agg_xmin) ||
1269 TransactionIdPrecedes(effective_xmin, agg_xmin)))
1270 agg_xmin = effective_xmin;
1271
1272 /* check the catalog xmin */
1273 if (TransactionIdIsValid(effective_catalog_xmin) &&
1274 (!TransactionIdIsValid(agg_catalog_xmin) ||
1275 TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1276 agg_catalog_xmin = effective_catalog_xmin;
1277 }
1278
1279 ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1280
1281 if (!already_locked)
1282 LWLockRelease(ReplicationSlotControlLock);
1283}
1284
1285/*
1286 * Compute the oldest restart LSN across all slots and inform xlog module.
1287 *
1288 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1289 * purpose, we don't try to account for that, because this module doesn't
1290 * know what to compare against.
1291 */
1292void
1294{
1295 int i;
1296 XLogRecPtr min_required = InvalidXLogRecPtr;
1297
1298 Assert(ReplicationSlotCtl != NULL);
1299
1300 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1301 for (i = 0; i < max_replication_slots; i++)
1302 {
1304 XLogRecPtr restart_lsn;
1305 XLogRecPtr last_saved_restart_lsn;
1306 bool invalidated;
1307 ReplicationSlotPersistency persistency;
1308
1309 if (!s->in_use)
1310 continue;
1311
1313 persistency = s->data.persistency;
1314 restart_lsn = s->data.restart_lsn;
1315 invalidated = s->data.invalidated != RS_INVAL_NONE;
1316 last_saved_restart_lsn = s->last_saved_restart_lsn;
1318
1319 /* invalidated slots need not apply */
1320 if (invalidated)
1321 continue;
1322
1323 /*
1324 * For persistent slot use last_saved_restart_lsn to compute the
1325 * oldest LSN for removal of WAL segments. The segments between
1326 * last_saved_restart_lsn and restart_lsn might be needed by a
1327 * persistent slot in the case of database crash. Non-persistent
1328 * slots can't survive the database crash, so we don't care about
1329 * last_saved_restart_lsn for them.
1330 */
1331 if (persistency == RS_PERSISTENT)
1332 {
1333 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1334 restart_lsn > last_saved_restart_lsn)
1335 {
1336 restart_lsn = last_saved_restart_lsn;
1337 }
1338 }
1339
1340 if (XLogRecPtrIsValid(restart_lsn) &&
1341 (!XLogRecPtrIsValid(min_required) ||
1342 restart_lsn < min_required))
1343 min_required = restart_lsn;
1344 }
1345 LWLockRelease(ReplicationSlotControlLock);
1346
1348}
1349
1350/*
1351 * Compute the oldest WAL LSN required by *logical* decoding slots..
1352 *
1353 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1354 * slots exist.
1355 *
1356 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1357 * ignores physical replication slots.
1358 *
1359 * The results aren't required frequently, so we don't maintain a precomputed
1360 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1361 */
1364{
1366 int i;
1367
1368 if (max_replication_slots <= 0)
1369 return InvalidXLogRecPtr;
1370
1371 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1372
1373 for (i = 0; i < max_replication_slots; i++)
1374 {
1375 ReplicationSlot *s;
1376 XLogRecPtr restart_lsn;
1377 XLogRecPtr last_saved_restart_lsn;
1378 bool invalidated;
1379 ReplicationSlotPersistency persistency;
1380
1382
1383 /* cannot change while ReplicationSlotCtlLock is held */
1384 if (!s->in_use)
1385 continue;
1386
1387 /* we're only interested in logical slots */
1388 if (!SlotIsLogical(s))
1389 continue;
1390
1391 /* read once, it's ok if it increases while we're checking */
1393 persistency = s->data.persistency;
1394 restart_lsn = s->data.restart_lsn;
1395 invalidated = s->data.invalidated != RS_INVAL_NONE;
1396 last_saved_restart_lsn = s->last_saved_restart_lsn;
1398
1399 /* invalidated slots need not apply */
1400 if (invalidated)
1401 continue;
1402
1403 /*
1404 * For persistent slot use last_saved_restart_lsn to compute the
1405 * oldest LSN for removal of WAL segments. The segments between
1406 * last_saved_restart_lsn and restart_lsn might be needed by a
1407 * persistent slot in the case of database crash. Non-persistent
1408 * slots can't survive the database crash, so we don't care about
1409 * last_saved_restart_lsn for them.
1410 */
1411 if (persistency == RS_PERSISTENT)
1412 {
1413 if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1414 restart_lsn > last_saved_restart_lsn)
1415 {
1416 restart_lsn = last_saved_restart_lsn;
1417 }
1418 }
1419
1420 if (!XLogRecPtrIsValid(restart_lsn))
1421 continue;
1422
1423 if (!XLogRecPtrIsValid(result) ||
1424 restart_lsn < result)
1425 result = restart_lsn;
1426 }
1427
1428 LWLockRelease(ReplicationSlotControlLock);
1429
1430 return result;
1431}
1432
1433/*
1434 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1435 * passed database oid.
1436 *
1437 * Returns true if there are any slots referencing the database. *nslots will
1438 * be set to the absolute number of slots in the database, *nactive to ones
1439 * currently active.
1440 */
1441bool
1442ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1443{
1444 int i;
1445
1446 *nslots = *nactive = 0;
1447
1448 if (max_replication_slots <= 0)
1449 return false;
1450
1451 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1452 for (i = 0; i < max_replication_slots; i++)
1453 {
1454 ReplicationSlot *s;
1455
1457
1458 /* cannot change while ReplicationSlotCtlLock is held */
1459 if (!s->in_use)
1460 continue;
1461
1462 /* only logical slots are database specific, skip */
1463 if (!SlotIsLogical(s))
1464 continue;
1465
1466 /* not our database, skip */
1467 if (s->data.database != dboid)
1468 continue;
1469
1470 /* NB: intentionally counting invalidated slots */
1471
1472 /* count slots with spinlock held */
1474 (*nslots)++;
1475 if (s->active_pid != 0)
1476 (*nactive)++;
1478 }
1479 LWLockRelease(ReplicationSlotControlLock);
1480
1481 if (*nslots > 0)
1482 return true;
1483 return false;
1484}
1485
1486/*
1487 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1488 * passed database oid. The caller should hold an exclusive lock on the
1489 * pg_database oid for the database to prevent creation of new slots on the db
1490 * or replay from existing slots.
1491 *
1492 * Another session that concurrently acquires an existing slot on the target DB
1493 * (most likely to drop it) may cause this function to ERROR. If that happens
1494 * it may have dropped some but not all slots.
1495 *
1496 * This routine isn't as efficient as it could be - but we don't drop
1497 * databases often, especially databases with lots of slots.
1498 *
1499 * If it drops the last logical slot in the cluster, it requests to disable
1500 * logical decoding.
1501 */
1502void
1504{
1505 int i;
1506 bool found_valid_logicalslot;
1507 bool dropped = false;
1508
1509 if (max_replication_slots <= 0)
1510 return;
1511
1512restart:
1513 found_valid_logicalslot = false;
1514 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1515 for (i = 0; i < max_replication_slots; i++)
1516 {
1517 ReplicationSlot *s;
1518 char *slotname;
1519 int active_pid;
1520
1522
1523 /* cannot change while ReplicationSlotCtlLock is held */
1524 if (!s->in_use)
1525 continue;
1526
1527 /* only logical slots are database specific, skip */
1528 if (!SlotIsLogical(s))
1529 continue;
1530
1531 /*
1532 * Check logical slots on other databases too so we can disable
1533 * logical decoding only if no slots in the cluster.
1534 */
1536 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1538
1539 /* not our database, skip */
1540 if (s->data.database != dboid)
1541 continue;
1542
1543 /* NB: intentionally including invalidated slots to drop */
1544
1545 /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1547 /* can't change while ReplicationSlotControlLock is held */
1548 slotname = NameStr(s->data.name);
1549 active_pid = s->active_pid;
1550 if (active_pid == 0)
1551 {
1553 s->active_pid = MyProcPid;
1554 }
1556
1557 /*
1558 * Even though we hold an exclusive lock on the database object a
1559 * logical slot for that DB can still be active, e.g. if it's
1560 * concurrently being dropped by a backend connected to another DB.
1561 *
1562 * That's fairly unlikely in practice, so we'll just bail out.
1563 *
1564 * The slot sync worker holds a shared lock on the database before
1565 * operating on synced logical slots to avoid conflict with the drop
1566 * happening here. The persistent synced slots are thus safe but there
1567 * is a possibility that the slot sync worker has created a temporary
1568 * slot (which stays active even on release) and we are trying to drop
1569 * that here. In practice, the chances of hitting this scenario are
1570 * less as during slot synchronization, the temporary slot is
1571 * immediately converted to persistent and thus is safe due to the
1572 * shared lock taken on the database. So, we'll just bail out in such
1573 * a case.
1574 *
1575 * XXX: We can consider shutting down the slot sync worker before
1576 * trying to drop synced temporary slots here.
1577 */
1578 if (active_pid)
1579 ereport(ERROR,
1580 (errcode(ERRCODE_OBJECT_IN_USE),
1581 errmsg("replication slot \"%s\" is active for PID %d",
1582 slotname, active_pid)));
1583
1584 /*
1585 * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1586 * holding ReplicationSlotControlLock over filesystem operations,
1587 * release ReplicationSlotControlLock and use
1588 * ReplicationSlotDropAcquired.
1589 *
1590 * As that means the set of slots could change, restart scan from the
1591 * beginning each time we release the lock.
1592 */
1593 LWLockRelease(ReplicationSlotControlLock);
1595 dropped = true;
1596 goto restart;
1597 }
1598 LWLockRelease(ReplicationSlotControlLock);
1599
1600 if (dropped && !found_valid_logicalslot)
1602}
1603
1604/*
1605 * Returns true if there is at least one in-use valid logical replication slot.
1606 */
1607bool
1609{
1610 bool found = false;
1611
1612 if (max_replication_slots <= 0)
1613 return false;
1614
1615 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1616 for (int i = 0; i < max_replication_slots; i++)
1617 {
1618 ReplicationSlot *s;
1619 bool invalidated;
1620
1622
1623 /* cannot change while ReplicationSlotCtlLock is held */
1624 if (!s->in_use)
1625 continue;
1626
1627 if (SlotIsPhysical(s))
1628 continue;
1629
1631 invalidated = s->data.invalidated != RS_INVAL_NONE;
1633
1634 if (invalidated)
1635 continue;
1636
1637 found = true;
1638 break;
1639 }
1640 LWLockRelease(ReplicationSlotControlLock);
1641
1642 return found;
1643}
1644
1645/*
1646 * Check whether the server's configuration supports using replication
1647 * slots.
1648 */
1649void
1651{
1652 /*
1653 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1654 * needs the same check.
1655 */
1656
1657 if (max_replication_slots == 0)
1658 ereport(ERROR,
1659 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1660 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1661
1663 ereport(ERROR,
1664 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1665 errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1666}
1667
1668/*
1669 * Check whether the user has privilege to use replication slots.
1670 */
1671void
1673{
1675 ereport(ERROR,
1676 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1677 errmsg("permission denied to use replication slots"),
1678 errdetail("Only roles with the %s attribute may use replication slots.",
1679 "REPLICATION")));
1680}
1681
1682/*
1683 * Reserve WAL for the currently active slot.
1684 *
1685 * Compute and set restart_lsn in a manner that's appropriate for the type of
1686 * the slot and concurrency safe.
1687 */
1688void
1690{
1692 XLogSegNo segno;
1693 XLogRecPtr restart_lsn;
1694
1695 Assert(slot != NULL);
1698
1699 /*
1700 * The replication slot mechanism is used to prevent the removal of
1701 * required WAL.
1702 *
1703 * Acquire an exclusive lock to prevent the checkpoint process from
1704 * concurrently computing the minimum slot LSN (see
1705 * CheckPointReplicationSlots). This ensures that the WAL reserved for
1706 * replication cannot be removed during a checkpoint.
1707 *
1708 * The mechanism is reliable because if WAL reservation occurs first, the
1709 * checkpoint must wait for the restart_lsn update before determining the
1710 * minimum non-removable LSN. On the other hand, if the checkpoint happens
1711 * first, subsequent WAL reservations will select positions at or beyond
1712 * the redo pointer of that checkpoint.
1713 */
1714 LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1715
1716 /*
1717 * For logical slots log a standby snapshot and start logical decoding at
1718 * exactly that position. That allows the slot to start up more quickly.
1719 * But on a standby we cannot do WAL writes, so just use the replay
1720 * pointer; effectively, an attempt to create a logical slot on standby
1721 * will cause it to wait for an xl_running_xact record to be logged
1722 * independently on the primary, so that a snapshot can be built using the
1723 * record.
1724 *
1725 * None of this is needed (or indeed helpful) for physical slots as
1726 * they'll start replay at the last logged checkpoint anyway. Instead,
1727 * return the location of the last redo LSN, where a base backup has to
1728 * start replay at.
1729 */
1730 if (SlotIsPhysical(slot))
1731 restart_lsn = GetRedoRecPtr();
1732 else if (RecoveryInProgress())
1733 restart_lsn = GetXLogReplayRecPtr(NULL);
1734 else
1735 restart_lsn = GetXLogInsertRecPtr();
1736
1737 SpinLockAcquire(&slot->mutex);
1738 slot->data.restart_lsn = restart_lsn;
1739 SpinLockRelease(&slot->mutex);
1740
1741 /* prevent WAL removal as fast as possible */
1743
1744 /* Checkpoint shouldn't remove the required WAL. */
1746 if (XLogGetLastRemovedSegno() >= segno)
1747 elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1748 NameStr(slot->data.name));
1749
1750 LWLockRelease(ReplicationSlotAllocationLock);
1751
1752 if (!RecoveryInProgress() && SlotIsLogical(slot))
1753 {
1754 XLogRecPtr flushptr;
1755
1756 /* make sure we have enough information to start */
1757 flushptr = LogStandbySnapshot();
1758
1759 /* and make sure it's fsynced to disk */
1760 XLogFlush(flushptr);
1761 }
1762}
1763
1764/*
1765 * Report that replication slot needs to be invalidated
1766 */
1767static void
1769 bool terminating,
1770 int pid,
1771 NameData slotname,
1772 XLogRecPtr restart_lsn,
1773 XLogRecPtr oldestLSN,
1774 TransactionId snapshotConflictHorizon,
1775 long slot_idle_seconds)
1776{
1777 StringInfoData err_detail;
1778 StringInfoData err_hint;
1779
1780 initStringInfo(&err_detail);
1781 initStringInfo(&err_hint);
1782
1783 switch (cause)
1784 {
1786 {
1787 uint64 ex = oldestLSN - restart_lsn;
1788
1789 appendStringInfo(&err_detail,
1790 ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1791 "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1792 ex),
1793 LSN_FORMAT_ARGS(restart_lsn),
1794 ex);
1795 /* translator: %s is a GUC variable name */
1796 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1797 "max_slot_wal_keep_size");
1798 break;
1799 }
1800 case RS_INVAL_HORIZON:
1801 appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1802 snapshotConflictHorizon);
1803 break;
1804
1805 case RS_INVAL_WAL_LEVEL:
1806 appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1807 break;
1808
1810 {
1811 /* translator: %s is a GUC variable name */
1812 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1813 slot_idle_seconds, "idle_replication_slot_timeout",
1815 /* translator: %s is a GUC variable name */
1816 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1817 "idle_replication_slot_timeout");
1818 break;
1819 }
1820 case RS_INVAL_NONE:
1822 }
1823
1824 ereport(LOG,
1825 terminating ?
1826 errmsg("terminating process %d to release replication slot \"%s\"",
1827 pid, NameStr(slotname)) :
1828 errmsg("invalidating obsolete replication slot \"%s\"",
1829 NameStr(slotname)),
1830 errdetail_internal("%s", err_detail.data),
1831 err_hint.len ? errhint("%s", err_hint.data) : 0);
1832
1833 pfree(err_detail.data);
1834 pfree(err_hint.data);
1835}
1836
1837/*
1838 * Can we invalidate an idle replication slot?
1839 *
1840 * Idle timeout invalidation is allowed only when:
1841 *
1842 * 1. Idle timeout is set
1843 * 2. Slot has reserved WAL
1844 * 3. Slot is inactive
1845 * 4. The slot is not being synced from the primary while the server is in
1846 * recovery. This is because synced slots are always considered to be
1847 * inactive because they don't perform logical decoding to produce changes.
1848 */
1849static inline bool
1851{
1854 s->inactive_since > 0 &&
1855 !(RecoveryInProgress() && s->data.synced));
1856}
1857
1858/*
1859 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1860 * becomes invalid among the given possible causes.
1861 *
1862 * This function sequentially checks all possible invalidation causes and
1863 * returns the first one for which the slot is eligible for invalidation.
1864 */
1867 XLogRecPtr oldestLSN, Oid dboid,
1868 TransactionId snapshotConflictHorizon,
1869 TimestampTz *inactive_since, TimestampTz now)
1870{
1871 Assert(possible_causes != RS_INVAL_NONE);
1872
1873 if (possible_causes & RS_INVAL_WAL_REMOVED)
1874 {
1875 XLogRecPtr restart_lsn = s->data.restart_lsn;
1876
1877 if (XLogRecPtrIsValid(restart_lsn) &&
1878 restart_lsn < oldestLSN)
1879 return RS_INVAL_WAL_REMOVED;
1880 }
1881
1882 if (possible_causes & RS_INVAL_HORIZON)
1883 {
1884 /* invalid DB oid signals a shared relation */
1885 if (SlotIsLogical(s) &&
1886 (dboid == InvalidOid || dboid == s->data.database))
1887 {
1888 TransactionId effective_xmin = s->effective_xmin;
1889 TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1890
1891 if (TransactionIdIsValid(effective_xmin) &&
1892 TransactionIdPrecedesOrEquals(effective_xmin,
1893 snapshotConflictHorizon))
1894 return RS_INVAL_HORIZON;
1895 else if (TransactionIdIsValid(catalog_effective_xmin) &&
1896 TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1897 snapshotConflictHorizon))
1898 return RS_INVAL_HORIZON;
1899 }
1900 }
1901
1902 if (possible_causes & RS_INVAL_WAL_LEVEL)
1903 {
1904 if (SlotIsLogical(s))
1905 return RS_INVAL_WAL_LEVEL;
1906 }
1907
1908 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1909 {
1910 Assert(now > 0);
1911
1912 if (CanInvalidateIdleSlot(s))
1913 {
1914 /*
1915 * Simulate the invalidation due to idle_timeout to test the
1916 * timeout behavior promptly, without waiting for it to trigger
1917 * naturally.
1918 */
1919#ifdef USE_INJECTION_POINTS
1920 if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1921 {
1922 *inactive_since = 0; /* since the beginning of time */
1923 return RS_INVAL_IDLE_TIMEOUT;
1924 }
1925#endif
1926
1927 /*
1928 * Check if the slot needs to be invalidated due to
1929 * idle_replication_slot_timeout GUC.
1930 */
1933 {
1934 *inactive_since = s->inactive_since;
1935 return RS_INVAL_IDLE_TIMEOUT;
1936 }
1937 }
1938 }
1939
1940 return RS_INVAL_NONE;
1941}
1942
1943/*
1944 * Helper for InvalidateObsoleteReplicationSlots
1945 *
1946 * Acquires the given slot and mark it invalid, if necessary and possible.
1947 *
1948 * Returns true if the slot was invalidated.
1949 *
1950 * Set *released_lock_out if ReplicationSlotControlLock was released in the
1951 * interim (and in that case we're not holding the lock at return, otherwise
1952 * we are).
1953 *
1954 * This is inherently racy, because we release the LWLock
1955 * for syscalls, so caller must restart if we return true.
1956 */
1957static bool
1959 ReplicationSlot *s,
1960 XLogRecPtr oldestLSN,
1961 Oid dboid, TransactionId snapshotConflictHorizon,
1962 bool *released_lock_out)
1963{
1964 int last_signaled_pid = 0;
1965 bool released_lock = false;
1966 bool invalidated = false;
1967 TimestampTz inactive_since = 0;
1968
1969 for (;;)
1970 {
1971 XLogRecPtr restart_lsn;
1972 NameData slotname;
1973 int active_pid = 0;
1975 TimestampTz now = 0;
1976 long slot_idle_secs = 0;
1977
1978 Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1979
1980 if (!s->in_use)
1981 {
1982 if (released_lock)
1983 LWLockRelease(ReplicationSlotControlLock);
1984 break;
1985 }
1986
1987 if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1988 {
1989 /*
1990 * Assign the current time here to avoid system call overhead
1991 * while holding the spinlock in subsequent code.
1992 */
1994 }
1995
1996 /*
1997 * Check if the slot needs to be invalidated. If it needs to be
1998 * invalidated, and is not currently acquired, acquire it and mark it
1999 * as having been invalidated. We do this with the spinlock held to
2000 * avoid race conditions -- for example the restart_lsn could move
2001 * forward, or the slot could be dropped.
2002 */
2004
2005 restart_lsn = s->data.restart_lsn;
2006
2007 /* we do nothing if the slot is already invalid */
2008 if (s->data.invalidated == RS_INVAL_NONE)
2009 invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2010 s, oldestLSN,
2011 dboid,
2012 snapshotConflictHorizon,
2013 &inactive_since,
2014 now);
2015
2016 /* if there's no invalidation, we're done */
2017 if (invalidation_cause == RS_INVAL_NONE)
2018 {
2020 if (released_lock)
2021 LWLockRelease(ReplicationSlotControlLock);
2022 break;
2023 }
2024
2025 slotname = s->data.name;
2026 active_pid = s->active_pid;
2027
2028 /*
2029 * If the slot can be acquired, do so and mark it invalidated
2030 * immediately. Otherwise we'll signal the owning process, below, and
2031 * retry.
2032 *
2033 * Note: Unlike other slot attributes, slot's inactive_since can't be
2034 * changed until the acquired slot is released or the owning process
2035 * is terminated. So, the inactive slot can only be invalidated
2036 * immediately without being terminated.
2037 */
2038 if (active_pid == 0)
2039 {
2041 s->active_pid = MyProcPid;
2042 s->data.invalidated = invalidation_cause;
2043
2044 /*
2045 * XXX: We should consider not overwriting restart_lsn and instead
2046 * just rely on .invalidated.
2047 */
2048 if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2049 {
2052 }
2053
2054 /* Let caller know */
2055 invalidated = true;
2056 }
2057
2059
2060 /*
2061 * Calculate the idle time duration of the slot if slot is marked
2062 * invalidated with RS_INVAL_IDLE_TIMEOUT.
2063 */
2064 if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2065 {
2066 int slot_idle_usecs;
2067
2068 TimestampDifference(inactive_since, now, &slot_idle_secs,
2069 &slot_idle_usecs);
2070 }
2071
2072 if (active_pid != 0)
2073 {
2074 /*
2075 * Prepare the sleep on the slot's condition variable before
2076 * releasing the lock, to close a possible race condition if the
2077 * slot is released before the sleep below.
2078 */
2080
2081 LWLockRelease(ReplicationSlotControlLock);
2082 released_lock = true;
2083
2084 /*
2085 * Signal to terminate the process that owns the slot, if we
2086 * haven't already signalled it. (Avoidance of repeated
2087 * signalling is the only reason for there to be a loop in this
2088 * routine; otherwise we could rely on caller's restart loop.)
2089 *
2090 * There is the race condition that other process may own the slot
2091 * after its current owner process is terminated and before this
2092 * process owns it. To handle that, we signal only if the PID of
2093 * the owning process has changed from the previous time. (This
2094 * logic assumes that the same PID is not reused very quickly.)
2095 */
2096 if (last_signaled_pid != active_pid)
2097 {
2098 ReportSlotInvalidation(invalidation_cause, true, active_pid,
2099 slotname, restart_lsn,
2100 oldestLSN, snapshotConflictHorizon,
2101 slot_idle_secs);
2102
2103 if (MyBackendType == B_STARTUP)
2104 (void) SendProcSignal(active_pid,
2107 else
2108 (void) kill(active_pid, SIGTERM);
2109
2110 last_signaled_pid = active_pid;
2111 }
2112
2113 /* Wait until the slot is released. */
2115 WAIT_EVENT_REPLICATION_SLOT_DROP);
2116
2117 /*
2118 * Re-acquire lock and start over; we expect to invalidate the
2119 * slot next time (unless another process acquires the slot in the
2120 * meantime).
2121 *
2122 * Note: It is possible for a slot to advance its restart_lsn or
2123 * xmin values sufficiently between when we release the mutex and
2124 * when we recheck, moving from a conflicting state to a non
2125 * conflicting state. This is intentional and safe: if the slot
2126 * has caught up while we're busy here, the resources we were
2127 * concerned about (WAL segments or tuples) have not yet been
2128 * removed, and there's no reason to invalidate the slot.
2129 */
2130 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2131 continue;
2132 }
2133 else
2134 {
2135 /*
2136 * We hold the slot now and have already invalidated it; flush it
2137 * to ensure that state persists.
2138 *
2139 * Don't want to hold ReplicationSlotControlLock across file
2140 * system operations, so release it now but be sure to tell caller
2141 * to restart from scratch.
2142 */
2143 LWLockRelease(ReplicationSlotControlLock);
2144 released_lock = true;
2145
2146 /* Make sure the invalidated state persists across server restart */
2150
2151 ReportSlotInvalidation(invalidation_cause, false, active_pid,
2152 slotname, restart_lsn,
2153 oldestLSN, snapshotConflictHorizon,
2154 slot_idle_secs);
2155
2156 /* done with this slot for now */
2157 break;
2158 }
2159 }
2160
2161 Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2162
2163 *released_lock_out = released_lock;
2164 return invalidated;
2165}
2166
2167/*
2168 * Invalidate slots that require resources about to be removed.
2169 *
2170 * Returns true when any slot have got invalidated.
2171 *
2172 * Whether a slot needs to be invalidated depends on the invalidation cause.
2173 * A slot is invalidated if it:
2174 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2175 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2176 * db; dboid may be InvalidOid for shared relations
2177 * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2178 * logical.
2179 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2180 * "idle_replication_slot_timeout" duration.
2181 *
2182 * Note: This function attempts to invalidate the slot for multiple possible
2183 * causes in a single pass, minimizing redundant iterations. The "cause"
2184 * parameter can be a MASK representing one or more of the defined causes.
2185 *
2186 * If it invalidates the last logical slot in the cluster, it requests to
2187 * disable logical decoding.
2188 *
2189 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2190 */
2191bool
2193 XLogSegNo oldestSegno, Oid dboid,
2194 TransactionId snapshotConflictHorizon)
2195{
2196 XLogRecPtr oldestLSN;
2197 bool invalidated = false;
2198 bool invalidated_logical = false;
2199 bool found_valid_logicalslot;
2200
2201 Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2202 Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2203 Assert(possible_causes != RS_INVAL_NONE);
2204
2205 if (max_replication_slots == 0)
2206 return invalidated;
2207
2208 XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2209
2210restart:
2211 found_valid_logicalslot = false;
2212 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2213 for (int i = 0; i < max_replication_slots; i++)
2214 {
2216 bool released_lock = false;
2217
2218 if (!s->in_use)
2219 continue;
2220
2221 /* Prevent invalidation of logical slots during binary upgrade */
2223 {
2225 found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2227
2228 continue;
2229 }
2230
2231 if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2232 dboid, snapshotConflictHorizon,
2233 &released_lock))
2234 {
2235 Assert(released_lock);
2236
2237 /* Remember we have invalidated a physical or logical slot */
2238 invalidated = true;
2239
2240 /*
2241 * Additionally, remember we have invalidated a logical slot as we
2242 * can request disabling logical decoding later.
2243 */
2244 if (SlotIsLogical(s))
2245 invalidated_logical = true;
2246 }
2247 else
2248 {
2249 /*
2250 * We need to check if the slot is invalidated here since
2251 * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2252 * is already invalidated.
2253 */
2255 found_valid_logicalslot |=
2258 }
2259
2260 /* if the lock was released, start from scratch */
2261 if (released_lock)
2262 goto restart;
2263 }
2264 LWLockRelease(ReplicationSlotControlLock);
2265
2266 /*
2267 * If any slots have been invalidated, recalculate the resource limits.
2268 */
2269 if (invalidated)
2270 {
2273 }
2274
2275 /*
2276 * Request the checkpointer to disable logical decoding if no valid
2277 * logical slots remain. If called by the checkpointer during a
2278 * checkpoint, only the request is initiated; actual deactivation is
2279 * deferred until after the checkpoint completes.
2280 */
2281 if (invalidated_logical && !found_valid_logicalslot)
2283
2284 return invalidated;
2285}
2286
2287/*
2288 * Flush all replication slots to disk.
2289 *
2290 * It is convenient to flush dirty replication slots at the time of checkpoint.
2291 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2292 * for which the confirmed_flush LSN has been updated since the last time it
2293 * was saved and flush them.
2294 */
2295void
2297{
2298 int i;
2299 bool last_saved_restart_lsn_updated = false;
2300
2301 elog(DEBUG1, "performing replication slot checkpoint");
2302
2303 /*
2304 * Prevent any slot from being created/dropped while we're active. As we
2305 * explicitly do *not* want to block iterating over replication_slots or
2306 * acquiring a slot we cannot take the control lock - but that's OK,
2307 * because holding ReplicationSlotAllocationLock is strictly stronger, and
2308 * enough to guarantee that nobody can change the in_use bits on us.
2309 *
2310 * Additionally, acquiring the Allocation lock is necessary to serialize
2311 * the slot flush process with concurrent slot WAL reservation. This
2312 * ensures that the WAL position being reserved is either flushed to disk
2313 * or is beyond or equal to the redo pointer of the current checkpoint
2314 * (See ReplicationSlotReserveWal for details).
2315 */
2316 LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2317
2318 for (i = 0; i < max_replication_slots; i++)
2319 {
2321 char path[MAXPGPATH];
2322
2323 if (!s->in_use)
2324 continue;
2325
2326 /* save the slot to disk, locking is handled in SaveSlotToPath() */
2327 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2328
2329 /*
2330 * Slot's data is not flushed each time the confirmed_flush LSN is
2331 * updated as that could lead to frequent writes. However, we decide
2332 * to force a flush of all logical slot's data at the time of shutdown
2333 * if the confirmed_flush LSN is changed since we last flushed it to
2334 * disk. This helps in avoiding an unnecessary retreat of the
2335 * confirmed_flush LSN after restart.
2336 */
2337 if (is_shutdown && SlotIsLogical(s))
2338 {
2340
2341 if (s->data.invalidated == RS_INVAL_NONE &&
2343 {
2344 s->just_dirtied = true;
2345 s->dirty = true;
2346 }
2348 }
2349
2350 /*
2351 * Track if we're going to update slot's last_saved_restart_lsn. We
2352 * need this to know if we need to recompute the required LSN.
2353 */
2355 last_saved_restart_lsn_updated = true;
2356
2357 SaveSlotToPath(s, path, LOG);
2358 }
2359 LWLockRelease(ReplicationSlotAllocationLock);
2360
2361 /*
2362 * Recompute the required LSN if SaveSlotToPath() updated
2363 * last_saved_restart_lsn for any slot.
2364 */
2365 if (last_saved_restart_lsn_updated)
2367}
2368
2369/*
2370 * Load all replication slots from disk into memory at server startup. This
2371 * needs to be run before we start crash recovery.
2372 */
2373void
2375{
2376 DIR *replication_dir;
2377 struct dirent *replication_de;
2378
2379 elog(DEBUG1, "starting up replication slots");
2380
2381 /* restore all slots by iterating over all on-disk entries */
2382 replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2383 while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2384 {
2385 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2386 PGFileType de_type;
2387
2388 if (strcmp(replication_de->d_name, ".") == 0 ||
2389 strcmp(replication_de->d_name, "..") == 0)
2390 continue;
2391
2392 snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2393 de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2394
2395 /* we're only creating directories here, skip if it's not our's */
2396 if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2397 continue;
2398
2399 /* we crashed while a slot was being setup or deleted, clean up */
2400 if (pg_str_endswith(replication_de->d_name, ".tmp"))
2401 {
2402 if (!rmtree(path, true))
2403 {
2405 (errmsg("could not remove directory \"%s\"",
2406 path)));
2407 continue;
2408 }
2410 continue;
2411 }
2412
2413 /* looks like a slot in a normal state, restore */
2414 RestoreSlotFromDisk(replication_de->d_name);
2415 }
2416 FreeDir(replication_dir);
2417
2418 /* currently no slots exist, we're done. */
2419 if (max_replication_slots <= 0)
2420 return;
2421
2422 /* Now that we have recovered all the data, compute replication xmin */
2425}
2426
2427/* ----
2428 * Manipulation of on-disk state of replication slots
2429 *
2430 * NB: none of the routines below should take any notice whether a slot is the
2431 * current one or not, that's all handled a layer above.
2432 * ----
2433 */
2434static void
2436{
2437 char tmppath[MAXPGPATH];
2438 char path[MAXPGPATH];
2439 struct stat st;
2440
2441 /*
2442 * No need to take out the io_in_progress_lock, nobody else can see this
2443 * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2444 * takes out the lock, if we'd take the lock here, we'd deadlock.
2445 */
2446
2447 sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2448 sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2449
2450 /*
2451 * It's just barely possible that some previous effort to create or drop a
2452 * slot with this name left a temp directory lying around. If that seems
2453 * to be the case, try to remove it. If the rmtree() fails, we'll error
2454 * out at the MakePGDirectory() below, so we don't bother checking
2455 * success.
2456 */
2457 if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2458 rmtree(tmppath, true);
2459
2460 /* Create and fsync the temporary slot directory. */
2461 if (MakePGDirectory(tmppath) < 0)
2462 ereport(ERROR,
2464 errmsg("could not create directory \"%s\": %m",
2465 tmppath)));
2466 fsync_fname(tmppath, true);
2467
2468 /* Write the actual state file. */
2469 slot->dirty = true; /* signal that we really need to write */
2470 SaveSlotToPath(slot, tmppath, ERROR);
2471
2472 /* Rename the directory into place. */
2473 if (rename(tmppath, path) != 0)
2474 ereport(ERROR,
2476 errmsg("could not rename file \"%s\" to \"%s\": %m",
2477 tmppath, path)));
2478
2479 /*
2480 * If we'd now fail - really unlikely - we wouldn't know whether this slot
2481 * would persist after an OS crash or not - so, force a restart. The
2482 * restart would try to fsync this again till it works.
2483 */
2485
2486 fsync_fname(path, true);
2488
2490}
2491
2492/*
2493 * Shared functionality between saving and creating a replication slot.
2494 */
2495static void
2496SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2497{
2498 char tmppath[MAXPGPATH];
2499 char path[MAXPGPATH];
2500 int fd;
2502 bool was_dirty;
2503
2504 /* first check whether there's something to write out */
2505 SpinLockAcquire(&slot->mutex);
2506 was_dirty = slot->dirty;
2507 slot->just_dirtied = false;
2508 SpinLockRelease(&slot->mutex);
2509
2510 /* and don't do anything if there's nothing to write */
2511 if (!was_dirty)
2512 return;
2513
2515
2516 /* silence valgrind :( */
2517 memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2518
2519 sprintf(tmppath, "%s/state.tmp", dir);
2520 sprintf(path, "%s/state", dir);
2521
2522 fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2523 if (fd < 0)
2524 {
2525 /*
2526 * If not an ERROR, then release the lock before returning. In case
2527 * of an ERROR, the error recovery path automatically releases the
2528 * lock, but no harm in explicitly releasing even in that case. Note
2529 * that LWLockRelease() could affect errno.
2530 */
2531 int save_errno = errno;
2532
2534 errno = save_errno;
2535 ereport(elevel,
2537 errmsg("could not create file \"%s\": %m",
2538 tmppath)));
2539 return;
2540 }
2541
2542 cp.magic = SLOT_MAGIC;
2544 cp.version = SLOT_VERSION;
2546
2547 SpinLockAcquire(&slot->mutex);
2548
2549 memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2550
2551 SpinLockRelease(&slot->mutex);
2552
2556 FIN_CRC32C(cp.checksum);
2557
2558 errno = 0;
2559 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2560 if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2561 {
2562 int save_errno = errno;
2563
2566 unlink(tmppath);
2568
2569 /* if write didn't set errno, assume problem is no disk space */
2570 errno = save_errno ? save_errno : ENOSPC;
2571 ereport(elevel,
2573 errmsg("could not write to file \"%s\": %m",
2574 tmppath)));
2575 return;
2576 }
2578
2579 /* fsync the temporary file */
2580 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2581 if (pg_fsync(fd) != 0)
2582 {
2583 int save_errno = errno;
2584
2587 unlink(tmppath);
2589
2590 errno = save_errno;
2591 ereport(elevel,
2593 errmsg("could not fsync file \"%s\": %m",
2594 tmppath)));
2595 return;
2596 }
2598
2599 if (CloseTransientFile(fd) != 0)
2600 {
2601 int save_errno = errno;
2602
2603 unlink(tmppath);
2605
2606 errno = save_errno;
2607 ereport(elevel,
2609 errmsg("could not close file \"%s\": %m",
2610 tmppath)));
2611 return;
2612 }
2613
2614 /* rename to permanent file, fsync file and directory */
2615 if (rename(tmppath, path) != 0)
2616 {
2617 int save_errno = errno;
2618
2619 unlink(tmppath);
2621
2622 errno = save_errno;
2623 ereport(elevel,
2625 errmsg("could not rename file \"%s\" to \"%s\": %m",
2626 tmppath, path)));
2627 return;
2628 }
2629
2630 /*
2631 * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2632 */
2634
2635 fsync_fname(path, false);
2636 fsync_fname(dir, true);
2638
2640
2641 /*
2642 * Successfully wrote, unset dirty bit, unless somebody dirtied again
2643 * already and remember the confirmed_flush LSN value.
2644 */
2645 SpinLockAcquire(&slot->mutex);
2646 if (!slot->just_dirtied)
2647 slot->dirty = false;
2650 SpinLockRelease(&slot->mutex);
2651
2653}
2654
2655/*
2656 * Load a single slot from disk into memory.
2657 */
2658static void
2660{
2662 int i;
2663 char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2664 char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2665 int fd;
2666 bool restored = false;
2667 int readBytes;
2668 pg_crc32c checksum;
2669 TimestampTz now = 0;
2670
2671 /* no need to lock here, no concurrent access allowed yet */
2672
2673 /* delete temp file if it exists */
2674 sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2675 sprintf(path, "%s/state.tmp", slotdir);
2676 if (unlink(path) < 0 && errno != ENOENT)
2677 ereport(PANIC,
2679 errmsg("could not remove file \"%s\": %m", path)));
2680
2681 sprintf(path, "%s/state", slotdir);
2682
2683 elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2684
2685 /* on some operating systems fsyncing a file requires O_RDWR */
2686 fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2687
2688 /*
2689 * We do not need to handle this as we are rename()ing the directory into
2690 * place only after we fsync()ed the state file.
2691 */
2692 if (fd < 0)
2693 ereport(PANIC,
2695 errmsg("could not open file \"%s\": %m", path)));
2696
2697 /*
2698 * Sync state file before we're reading from it. We might have crashed
2699 * while it wasn't synced yet and we shouldn't continue on that basis.
2700 */
2701 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2702 if (pg_fsync(fd) != 0)
2703 ereport(PANIC,
2705 errmsg("could not fsync file \"%s\": %m",
2706 path)));
2708
2709 /* Also sync the parent directory */
2711 fsync_fname(slotdir, true);
2713
2714 /* read part of statefile that's guaranteed to be version independent */
2715 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2716 readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2718 if (readBytes != ReplicationSlotOnDiskConstantSize)
2719 {
2720 if (readBytes < 0)
2721 ereport(PANIC,
2723 errmsg("could not read file \"%s\": %m", path)));
2724 else
2725 ereport(PANIC,
2727 errmsg("could not read file \"%s\": read %d of %zu",
2728 path, readBytes,
2730 }
2731
2732 /* verify magic */
2733 if (cp.magic != SLOT_MAGIC)
2734 ereport(PANIC,
2736 errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2737 path, cp.magic, SLOT_MAGIC)));
2738
2739 /* verify version */
2740 if (cp.version != SLOT_VERSION)
2741 ereport(PANIC,
2743 errmsg("replication slot file \"%s\" has unsupported version %u",
2744 path, cp.version)));
2745
2746 /* boundary check on length */
2748 ereport(PANIC,
2750 errmsg("replication slot file \"%s\" has corrupted length %u",
2751 path, cp.length)));
2752
2753 /* Now that we know the size, read the entire file */
2754 pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2755 readBytes = read(fd,
2756 (char *) &cp + ReplicationSlotOnDiskConstantSize,
2757 cp.length);
2759 if (readBytes != cp.length)
2760 {
2761 if (readBytes < 0)
2762 ereport(PANIC,
2764 errmsg("could not read file \"%s\": %m", path)));
2765 else
2766 ereport(PANIC,
2768 errmsg("could not read file \"%s\": read %d of %zu",
2769 path, readBytes, (Size) cp.length)));
2770 }
2771
2772 if (CloseTransientFile(fd) != 0)
2773 ereport(PANIC,
2775 errmsg("could not close file \"%s\": %m", path)));
2776
2777 /* now verify the CRC */
2778 INIT_CRC32C(checksum);
2779 COMP_CRC32C(checksum,
2782 FIN_CRC32C(checksum);
2783
2784 if (!EQ_CRC32C(checksum, cp.checksum))
2785 ereport(PANIC,
2786 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2787 path, checksum, cp.checksum)));
2788
2789 /*
2790 * If we crashed with an ephemeral slot active, don't restore but delete
2791 * it.
2792 */
2794 {
2795 if (!rmtree(slotdir, true))
2796 {
2798 (errmsg("could not remove directory \"%s\"",
2799 slotdir)));
2800 }
2802 return;
2803 }
2804
2805 /*
2806 * Verify that requirements for the specific slot type are met. That's
2807 * important because if these aren't met we're not guaranteed to retain
2808 * all the necessary resources for the slot.
2809 *
2810 * NB: We have to do so *after* the above checks for ephemeral slots,
2811 * because otherwise a slot that shouldn't exist anymore could prevent
2812 * restarts.
2813 *
2814 * NB: Changing the requirements here also requires adapting
2815 * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2816 */
2817 if (cp.slotdata.database != InvalidOid)
2818 {
2820 ereport(FATAL,
2821 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2822 errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2823 NameStr(cp.slotdata.name)),
2824 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2825
2826 /*
2827 * In standby mode, the hot standby must be enabled. This check is
2828 * necessary to ensure logical slots are invalidated when they become
2829 * incompatible due to insufficient wal_level. Otherwise, if the
2830 * primary reduces effective_wal_level < logical while hot standby is
2831 * disabled, primary disable logical decoding while hot standby is
2832 * disabled, logical slots would remain valid even after promotion.
2833 */
2835 ereport(FATAL,
2836 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2837 errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2838 NameStr(cp.slotdata.name)),
2839 errhint("Change \"hot_standby\" to be \"on\".")));
2840 }
2841 else if (wal_level < WAL_LEVEL_REPLICA)
2842 ereport(FATAL,
2843 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2844 errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2845 NameStr(cp.slotdata.name)),
2846 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2847
2848 /* nothing can be active yet, don't lock anything */
2849 for (i = 0; i < max_replication_slots; i++)
2850 {
2851 ReplicationSlot *slot;
2852
2854
2855 if (slot->in_use)
2856 continue;
2857
2858 /* restore the entire set of persistent data */
2859 memcpy(&slot->data, &cp.slotdata,
2861
2862 /* initialize in memory state */
2863 slot->effective_xmin = cp.slotdata.xmin;
2867
2872
2873 slot->in_use = true;
2874 slot->active_pid = 0;
2875
2876 /*
2877 * Set the time since the slot has become inactive after loading the
2878 * slot from the disk into memory. Whoever acquires the slot i.e.
2879 * makes the slot active will reset it. Use the same inactive_since
2880 * time for all the slots.
2881 */
2882 if (now == 0)
2884
2886
2887 restored = true;
2888 break;
2889 }
2890
2891 if (!restored)
2892 ereport(FATAL,
2893 (errmsg("too many replication slots active before shutdown"),
2894 errhint("Increase \"max_replication_slots\" and try again.")));
2895}
2896
2897/*
2898 * Maps an invalidation reason for a replication slot to
2899 * ReplicationSlotInvalidationCause.
2900 */
2902GetSlotInvalidationCause(const char *cause_name)
2903{
2904 Assert(cause_name);
2905
2906 /* Search lookup table for the cause having this name */
2907 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2908 {
2909 if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2911 }
2912
2913 Assert(false);
2914 return RS_INVAL_NONE; /* to keep compiler quiet */
2915}
2916
2917/*
2918 * Maps an ReplicationSlotInvalidationCause to the invalidation
2919 * reason for a replication slot.
2920 */
2921const char *
2923{
2924 /* Search lookup table for the name of this cause */
2925 for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2926 {
2927 if (SlotInvalidationCauses[i].cause == cause)
2929 }
2930
2931 Assert(false);
2932 return "none"; /* to keep compiler quiet */
2933}
2934
2935/*
2936 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2937 *
2938 * The rawname will be parsed, and the result will be saved into *elemlist.
2939 */
2940static bool
2941validate_sync_standby_slots(char *rawname, List **elemlist)
2942{
2943 /* Verify syntax and parse string into a list of identifiers */
2944 if (!SplitIdentifierString(rawname, ',', elemlist))
2945 {
2946 GUC_check_errdetail("List syntax is invalid.");
2947 return false;
2948 }
2949
2950 /* Iterate the list to validate each slot name */
2951 foreach_ptr(char, name, *elemlist)
2952 {
2953 int err_code;
2954 char *err_msg = NULL;
2955 char *err_hint = NULL;
2956
2957 if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2958 &err_msg, &err_hint))
2959 {
2960 GUC_check_errcode(err_code);
2961 GUC_check_errdetail("%s", err_msg);
2962 if (err_hint != NULL)
2963 GUC_check_errhint("%s", err_hint);
2964 return false;
2965 }
2966 }
2967
2968 return true;
2969}
2970
2971/*
2972 * GUC check_hook for synchronized_standby_slots
2973 */
2974bool
2976{
2977 char *rawname;
2978 char *ptr;
2979 List *elemlist;
2980 int size;
2981 bool ok;
2983
2984 if ((*newval)[0] == '\0')
2985 return true;
2986
2987 /* Need a modifiable copy of the GUC string */
2988 rawname = pstrdup(*newval);
2989
2990 /* Now verify if the specified slots exist and have correct type */
2991 ok = validate_sync_standby_slots(rawname, &elemlist);
2992
2993 if (!ok || elemlist == NIL)
2994 {
2995 pfree(rawname);
2996 list_free(elemlist);
2997 return ok;
2998 }
2999
3000 /* Compute the size required for the SyncStandbySlotsConfigData struct */
3001 size = offsetof(SyncStandbySlotsConfigData, slot_names);
3002 foreach_ptr(char, slot_name, elemlist)
3003 size += strlen(slot_name) + 1;
3004
3005 /* GUC extra value must be guc_malloc'd, not palloc'd */
3006 config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3007 if (!config)
3008 return false;
3009
3010 /* Transform the data into SyncStandbySlotsConfigData */
3011 config->nslotnames = list_length(elemlist);
3012
3013 ptr = config->slot_names;
3014 foreach_ptr(char, slot_name, elemlist)
3015 {
3016 strcpy(ptr, slot_name);
3017 ptr += strlen(slot_name) + 1;
3018 }
3019
3020 *extra = config;
3021
3022 pfree(rawname);
3023 list_free(elemlist);
3024 return true;
3025}
3026
3027/*
3028 * GUC assign_hook for synchronized_standby_slots
3029 */
3030void
3032{
3033 /*
3034 * The standby slots may have changed, so we must recompute the oldest
3035 * LSN.
3036 */
3038
3040}
3041
3042/*
3043 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3044 */
3045bool
3046SlotExistsInSyncStandbySlots(const char *slot_name)
3047{
3048 const char *standby_slot_name;
3049
3050 /* Return false if there is no value in synchronized_standby_slots */
3052 return false;
3053
3054 /*
3055 * XXX: We are not expecting this list to be long so a linear search
3056 * shouldn't hurt but if that turns out not to be true then we can cache
3057 * this information for each WalSender as well.
3058 */
3059 standby_slot_name = synchronized_standby_slots_config->slot_names;
3060 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3061 {
3062 if (strcmp(standby_slot_name, slot_name) == 0)
3063 return true;
3064
3065 standby_slot_name += strlen(standby_slot_name) + 1;
3066 }
3067
3068 return false;
3069}
3070
3071/*
3072 * Return true if the slots specified in synchronized_standby_slots have caught up to
3073 * the given WAL location, false otherwise.
3074 *
3075 * The elevel parameter specifies the error level used for logging messages
3076 * related to slots that do not exist, are invalidated, or are inactive.
3077 */
3078bool
3079StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3080{
3081 const char *name;
3082 int caught_up_slot_num = 0;
3083 XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3084
3085 /*
3086 * Don't need to wait for the standbys to catch up if there is no value in
3087 * synchronized_standby_slots.
3088 */
3090 return true;
3091
3092 /*
3093 * Don't need to wait for the standbys to catch up if we are on a standby
3094 * server, since we do not support syncing slots to cascading standbys.
3095 */
3096 if (RecoveryInProgress())
3097 return true;
3098
3099 /*
3100 * Don't need to wait for the standbys to catch up if they are already
3101 * beyond the specified WAL location.
3102 */
3104 ss_oldest_flush_lsn >= wait_for_lsn)
3105 return true;
3106
3107 /*
3108 * To prevent concurrent slot dropping and creation while filtering the
3109 * slots, take the ReplicationSlotControlLock outside of the loop.
3110 */
3111 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3112
3114 for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3115 {
3116 XLogRecPtr restart_lsn;
3117 bool invalidated;
3118 bool inactive;
3119 ReplicationSlot *slot;
3120
3121 slot = SearchNamedReplicationSlot(name, false);
3122
3123 /*
3124 * If a slot name provided in synchronized_standby_slots does not
3125 * exist, report a message and exit the loop.
3126 */
3127 if (!slot)
3128 {
3129 ereport(elevel,
3130 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3131 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3132 name, "synchronized_standby_slots"),
3133 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3134 name),
3135 errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3136 name, "synchronized_standby_slots"));
3137 break;
3138 }
3139
3140 /* Same as above: if a slot is not physical, exit the loop. */
3141 if (SlotIsLogical(slot))
3142 {
3143 ereport(elevel,
3144 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3145 errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3146 name, "synchronized_standby_slots"),
3147 errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3148 name),
3149 errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3150 name, "synchronized_standby_slots"));
3151 break;
3152 }
3153
3154 SpinLockAcquire(&slot->mutex);
3155 restart_lsn = slot->data.restart_lsn;
3156 invalidated = slot->data.invalidated != RS_INVAL_NONE;
3157 inactive = slot->active_pid == 0;
3158 SpinLockRelease(&slot->mutex);
3159
3160 if (invalidated)
3161 {
3162 /* Specified physical slot has been invalidated */
3163 ereport(elevel,
3164 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3165 errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3166 name, "synchronized_standby_slots"),
3167 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3168 name),
3169 errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3170 name, "synchronized_standby_slots"));
3171 break;
3172 }
3173
3174 if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3175 {
3176 /* Log a message if no active_pid for this physical slot */
3177 if (inactive)
3178 ereport(elevel,
3179 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3180 errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3181 name, "synchronized_standby_slots"),
3182 errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3183 name),
3184 errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3185 name, "synchronized_standby_slots"));
3186
3187 /* Continue if the current slot hasn't caught up. */
3188 break;
3189 }
3190
3191 Assert(restart_lsn >= wait_for_lsn);
3192
3193 if (!XLogRecPtrIsValid(min_restart_lsn) ||
3194 min_restart_lsn > restart_lsn)
3195 min_restart_lsn = restart_lsn;
3196
3197 caught_up_slot_num++;
3198
3199 name += strlen(name) + 1;
3200 }
3201
3202 LWLockRelease(ReplicationSlotControlLock);
3203
3204 /*
3205 * Return false if not all the standbys have caught up to the specified
3206 * WAL location.
3207 */
3208 if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3209 return false;
3210
3211 /* The ss_oldest_flush_lsn must not retreat. */
3213 min_restart_lsn >= ss_oldest_flush_lsn);
3214
3215 ss_oldest_flush_lsn = min_restart_lsn;
3216
3217 return true;
3218}
3219
3220/*
3221 * Wait for physical standbys to confirm receiving the given lsn.
3222 *
3223 * Used by logical decoding SQL functions. It waits for physical standbys
3224 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3225 */
3226void
3228{
3229 /*
3230 * Don't need to wait for the standby to catch up if the current acquired
3231 * slot is not a logical failover slot, or there is no value in
3232 * synchronized_standby_slots.
3233 */
3235 return;
3236
3238
3239 for (;;)
3240 {
3242
3244 {
3245 ConfigReloadPending = false;
3247 }
3248
3249 /* Exit if done waiting for every slot. */
3250 if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3251 break;
3252
3253 /*
3254 * Wait for the slots in the synchronized_standby_slots to catch up,
3255 * but use a timeout (1s) so we can also check if the
3256 * synchronized_standby_slots has been changed.
3257 */
3259 WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3260 }
3261
3263}
void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, long *secs, int *microsecs)
Definition: timestamp.c:1721
bool TimestampDifferenceExceedsSeconds(TimestampTz start_time, TimestampTz stop_time, int threshold_sec)
Definition: timestamp.c:1795
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1645
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1609
#define NameStr(name)
Definition: c.h:765
#define ngettext(s, p, n)
Definition: c.h:1158
#define PG_BINARY
Definition: c.h:1250
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:486
uint64_t uint64
Definition: c.h:553
#define pg_unreachable()
Definition: c.h:347
uint32_t uint32
Definition: c.h:552
#define lengthof(array)
Definition: c.h:801
#define MemSet(start, val, len)
Definition: c.h:1011
uint32 TransactionId
Definition: c.h:671
size_t Size
Definition: c.h:624
bool ConditionVariableCancelSleep(void)
bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info)
void ConditionVariableBroadcast(ConditionVariable *cv)
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ConditionVariableInit(ConditionVariable *cv)
void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
int64 TimestampTz
Definition: timestamp.h:39
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1170
int errdetail_internal(const char *fmt,...)
Definition: elog.c:1243
int errcode_for_file_access(void)
Definition: elog.c:886
int errdetail(const char *fmt,...)
Definition: elog.c:1216
int errhint_internal(const char *fmt,...)
Definition: elog.c:1352
int errhint(const char *fmt,...)
Definition: elog.c:1330
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define _(x)
Definition: elog.c:91
#define LOG
Definition: elog.h:31
#define FATAL
Definition: elog.h:41
#define WARNING
Definition: elog.h:36
#define PANIC
Definition: elog.h:42
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
int MakePGDirectory(const char *directoryName)
Definition: fd.c:3959
int FreeDir(DIR *dir)
Definition: fd.c:3005
int CloseTransientFile(int fd)
Definition: fd.c:2851
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:753
DIR * AllocateDir(const char *dirname)
Definition: fd.c:2887
struct dirent * ReadDir(DIR *dir, const char *dirname)
Definition: fd.c:2953
int pg_fsync(int fd)
Definition: fd.c:386
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2674
PGFileType get_dirent_type(const char *path, const struct dirent *de, bool look_through_symlinks, int elevel)
Definition: file_utils.c:547
PGFileType
Definition: file_utils.h:19
@ PGFILETYPE_DIR
Definition: file_utils.h:23
@ PGFILETYPE_ERROR
Definition: file_utils.h:20
bool IsBinaryUpgrade
Definition: globals.c:121
int MyProcPid
Definition: globals.c:47
bool IsUnderPostmaster
Definition: globals.c:120
Oid MyDatabaseId
Definition: globals.c:94
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
void GUC_check_errcode(int sqlerrcode)
Definition: guc.c:6628
void * guc_malloc(int elevel, size_t size)
Definition: guc.c:636
#define newval
#define GUC_check_errdetail
Definition: guc.h:505
GucSource
Definition: guc.h:112
@ PGC_SIGHUP
Definition: guc.h:75
#define GUC_check_errhint
Definition: guc.h:509
Assert(PointerIsAligned(start, uint64))
#define IS_INJECTION_POINT_ATTACHED(name)
#define write(a, b, c)
Definition: win32.h:14
#define read(a, b, c)
Definition: win32.h:13
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:337
int i
Definition: isn.c:77
bool IsLogicalLauncher(void)
Definition: launcher.c:1587
void list_free(List *list)
Definition: list.c:1546
void RequestDisableLogicalDecoding(void)
Definition: logicalctl.c:433
bool LWLockHeldByMe(LWLock *lock)
Definition: lwlock.c:1981
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1178
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:2025
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1898
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:698
@ LW_SHARED
Definition: lwlock.h:113
@ LW_EXCLUSIVE
Definition: lwlock.h:112
char * pstrdup(const char *in)
Definition: mcxt.c:1781
void pfree(void *pointer)
Definition: mcxt.c:1616
#define START_CRIT_SECTION()
Definition: miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
@ B_STARTUP
Definition: miscadmin.h:365
#define END_CRIT_SECTION()
Definition: miscadmin.h:152
Oid GetUserId(void)
Definition: miscinit.c:469
BackendType MyBackendType
Definition: miscinit.c:64
bool has_rolreplication(Oid roleid)
Definition: miscinit.c:688
void namestrcpy(Name name, const char *str)
Definition: name.c:233
void * arg
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:42
#define NAMEDATALEN
#define MAXPGPATH
uint32 pg_crc32c
Definition: pg_crc32c.h:38
#define COMP_CRC32C(crc, data, len)
Definition: pg_crc32c.h:153
#define EQ_CRC32C(c1, c2)
Definition: pg_crc32c.h:42
#define INIT_CRC32C(crc)
Definition: pg_crc32c.h:41
#define FIN_CRC32C(crc)
Definition: pg_crc32c.h:158
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
static bool failover
static rewind_source * source
Definition: pg_rewind.c:89
void pgstat_create_replslot(ReplicationSlot *slot)
void pgstat_acquire_replslot(ReplicationSlot *slot)
void pgstat_drop_replslot(ReplicationSlot *slot)
#define sprintf
Definition: port.h:262
#define snprintf
Definition: port.h:260
uint64_t Datum
Definition: postgres.h:70
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void ProcArraySetReplicationSlotXmin(TransactionId xmin, TransactionId catalog_xmin, bool already_locked)
Definition: procarray.c:3903
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:284
@ PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT
Definition: procsignal.h:46
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
bool rmtree(const char *path, bool rmtopdir)
Definition: rmtree.c:50
Size add_size(Size s1, Size s2)
Definition: shmem.c:495
Size mul_size(Size s1, Size s2)
Definition: shmem.c:510
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:389
int ReplicationSlotIndex(ReplicationSlot *slot)
Definition: slot.c:579
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition: slot.c:626
static bool InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, bool *released_lock_out)
Definition: slot.c:1958
static const SlotInvalidationCauseMap SlotInvalidationCauses[]
Definition: slot.c:113
char * synchronized_standby_slots
Definition: slot.c:164
void assign_synchronized_standby_slots(const char *newval, void *extra)
Definition: slot.c:3031
#define ReplicationSlotOnDiskChecksummedSize
Definition: slot.c:135
void CheckPointReplicationSlots(bool is_shutdown)
Definition: slot.c:2296
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:384
int idle_replication_slot_timeout_secs
Definition: slot.c:158
void ReplicationSlotDropAcquired(void)
Definition: slot.c:1027
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1169
static ReplicationSlotInvalidationCause DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, TimestampTz *inactive_since, TimestampTz now)
Definition: slot.c:1866
void ReplicationSlotReserveWal(void)
Definition: slot.c:1689
bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
Definition: slot.c:1442
struct SlotInvalidationCauseMap SlotInvalidationCauseMap
static XLogRecPtr ss_oldest_flush_lsn
Definition: slot.c:173
bool ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name, int *err_code, char **err_msg, char **err_hint)
Definition: slot.c:311
void ReplicationSlotsDropDBSlots(Oid dboid)
Definition: slot.c:1503
#define ReplicationSlotOnDiskNotChecksummedSize
Definition: slot.c:132
XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void)
Definition: slot.c:1363
ReplicationSlotInvalidationCause GetSlotInvalidationCause(const char *cause_name)
Definition: slot.c:2902
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1211
static void RestoreSlotFromDisk(const char *name)
Definition: slot.c:2659
void ReplicationSlotPersist(void)
Definition: slot.c:1186
bool CheckLogicalSlotExists(void)
Definition: slot.c:1608
static void ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, bool terminating, int pid, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, long slot_idle_seconds)
Definition: slot.c:1768
ReplicationSlot * MyReplicationSlot
Definition: slot.c:148
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
Definition: slot.c:2496
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:915
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:3046
static bool validate_sync_standby_slots(char *rawname, List **elemlist)
Definition: slot.c:2941
void ReplicationSlotSave(void)
Definition: slot.c:1151
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:546
static void CreateSlotOnDisk(ReplicationSlot *slot)
Definition: slot.c:2435
#define ReplicationSlotOnDiskV2Size
Definition: slot.c:138
void CheckSlotPermissions(void)
Definition: slot.c:1672
bool ReplicationSlotName(int index, Name name)
Definition: slot.c:595
bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
Definition: slot.c:2975
void ReplicationSlotsShmemInit(void)
Definition: slot.c:206
bool ReplicationSlotValidateName(const char *name, bool allow_reserved_name, int elevel)
Definition: slot.c:266
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:945
void ReplicationSlotRelease(void)
Definition: slot.c:764
int max_replication_slots
Definition: slot.c:151
StaticAssertDecl(lengthof(SlotInvalidationCauses)==(RS_INVAL_MAX_CAUSES+1), "array length mismatch")
ReplicationSlotCtlData * ReplicationSlotCtl
Definition: slot.c:145
#define SLOT_VERSION
Definition: slot.c:142
struct ReplicationSlotOnDisk ReplicationSlotOnDisk
void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
Definition: slot.c:3227
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:3079
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1293
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:863
void ReplicationSlotInitialize(void)
Definition: slot.c:241
static void ReplicationSlotDropPtr(ReplicationSlot *slot)
Definition: slot.c:1044
void StartupReplicationSlots(void)
Definition: slot.c:2374
static bool CanInvalidateIdleSlot(ReplicationSlot *s)
Definition: slot.c:1850
void CheckSlotRequirements(void)
Definition: slot.c:1650
#define SLOT_MAGIC
Definition: slot.c:141
bool InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon)
Definition: slot.c:2192
static SyncStandbySlotsConfigData * synchronized_standby_slots_config
Definition: slot.c:167
#define ReplicationSlotOnDiskConstantSize
Definition: slot.c:129
Size ReplicationSlotsShmemSize(void)
Definition: slot.c:188
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition: slot.c:2922
static void ReplicationSlotShmemExit(int code, Datum arg)
Definition: slot.c:250
static bool IsSlotForConflictCheck(const char *name)
Definition: slot.c:361
#define CONFLICT_DETECTION_SLOT
Definition: slot.h:28
#define RS_INVAL_MAX_CAUSES
Definition: slot.h:72
ReplicationSlotPersistency
Definition: slot.h:44
@ RS_PERSISTENT
Definition: slot.h:45
@ RS_EPHEMERAL
Definition: slot.h:46
@ RS_TEMPORARY
Definition: slot.h:47
#define SlotIsPhysical(slot)
Definition: slot.h:284
#define PG_REPLSLOT_DIR
Definition: slot.h:21
ReplicationSlotInvalidationCause
Definition: slot.h:59
@ RS_INVAL_WAL_REMOVED
Definition: slot.h:62
@ RS_INVAL_IDLE_TIMEOUT
Definition: slot.h:68
@ RS_INVAL_HORIZON
Definition: slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition: slot.h:66
@ RS_INVAL_NONE
Definition: slot.h:60
#define SlotIsLogical(slot)
Definition: slot.h:285
static void ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts, bool acquire_lock)
Definition: slot.h:303
@ SS_SKIP_NONE
Definition: slot.h:82
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1882
#define SpinLockInit(lock)
Definition: spin.h:57
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
PGPROC * MyProc
Definition: proc.c:67
PROC_HDR * ProcGlobal
Definition: proc.c:79
XLogRecPtr LogStandbySnapshot(void)
Definition: standby.c:1282
#define ERRCODE_DUPLICATE_OBJECT
Definition: streamutil.c:30
bool pg_str_endswith(const char *str, const char *end)
Definition: string.c:31
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: dirent.c:26
Definition: pg_list.h:54
uint8 statusFlags
Definition: proc.h:259
int pgxactoff
Definition: proc.h:201
uint8 * statusFlags
Definition: proc.h:403
ReplicationSlot replication_slots[1]
Definition: slot.h:296
uint32 version
Definition: slot.c:75
ReplicationSlotPersistentData slotdata
Definition: slot.c:83
pg_crc32c checksum
Definition: slot.c:72
TransactionId xmin
Definition: slot.h:114
TransactionId catalog_xmin
Definition: slot.h:122
XLogRecPtr confirmed_flush
Definition: slot.h:136
ReplicationSlotPersistency persistency
Definition: slot.h:106
ReplicationSlotInvalidationCause invalidated
Definition: slot.h:128
XLogRecPtr candidate_xmin_lsn
Definition: slot.h:226
TransactionId effective_catalog_xmin
Definition: slot.h:207
slock_t mutex
Definition: slot.h:183
XLogRecPtr candidate_restart_valid
Definition: slot.h:227
XLogRecPtr last_saved_confirmed_flush
Definition: slot.h:235
pid_t active_pid
Definition: slot.h:189
SlotSyncSkipReason slotsync_skip_reason
Definition: slot.h:281
bool in_use
Definition: slot.h:186
TransactionId effective_xmin
Definition: slot.h:206
bool just_dirtied
Definition: slot.h:192
XLogRecPtr last_saved_restart_lsn
Definition: slot.h:268
XLogRecPtr candidate_restart_lsn
Definition: slot.h:228
LWLock io_in_progress_lock
Definition: slot.h:213
ConditionVariable active_cv
Definition: slot.h:216
TransactionId candidate_catalog_xmin
Definition: slot.h:225
bool dirty
Definition: slot.h:193
ReplicationSlotPersistentData data
Definition: slot.h:210
TimestampTz inactive_since
Definition: slot.h:242
const char * cause_name
Definition: slot.c:110
ReplicationSlotInvalidationCause cause
Definition: slot.c:109
char slot_names[FLEXIBLE_ARRAY_MEMBER]
Definition: slot.c:101
ConditionVariable wal_confirm_rcv_cv
Definition: dirent.h:10
char d_name[MAX_PATH]
Definition: dirent.h:15
Definition: type.h:96
Definition: c.h:760
unsigned short st_mode
Definition: win32_port.h:258
#define InvalidTransactionId
Definition: transam.h:31
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.h:282
#define TransactionIdIsValid(xid)
Definition: transam.h:41
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.h:263
bool SplitIdentifierString(char *rawstring, char separator, List **namelist)
Definition: varlena.c:2728
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:69
static void pgstat_report_wait_end(void)
Definition: wait_event.h:85
const char * name
bool am_walsender
Definition: walsender.c:123
bool log_replication_commands
Definition: walsender.c:133
WalSndCtlData * WalSndCtl
Definition: walsender.c:117
#define stat
Definition: win32_port.h:274
#define S_ISDIR(m)
Definition: win32_port.h:315
#define kill(pid, sig)
Definition: win32_port.h:490
bool RecoveryInProgress(void)
Definition: xlog.c:6443
XLogSegNo XLogGetLastRemovedSegno(void)
Definition: xlog.c:3780
bool EnableHotStandby
Definition: xlog.c:124
XLogRecPtr GetRedoRecPtr(void)
Definition: xlog.c:6546
int wal_level
Definition: xlog.c:134
int wal_segment_size
Definition: xlog.c:146
void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
Definition: xlog.c:2670
XLogRecPtr GetXLogInsertRecPtr(void)
Definition: xlog.c:9578
void XLogFlush(XLogRecPtr record)
Definition: xlog.c:2784
@ WAL_LEVEL_REPLICA
Definition: xlog.h:76
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition: xlogdefs.h:29
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:47
uint64 XLogRecPtr
Definition: xlogdefs.h:21
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
uint64 XLogSegNo
Definition: xlogdefs.h:52
bool StandbyMode
Definition: xlogrecovery.c:150
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)