diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 7de03c79f6f0..4475260286eb 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -466,3 +466,35 @@ SELECT pg_drop_replication_slot('physical_slot'); (1 row) +-- Test logical slots with not WAL reservation. +SELECT 'init' FROM pg_create_logical_replication_slot('slot_nowal', 'test_decoding', false, false, false, false); + ?column? +---------- + init +(1 row) + +-- No WAL reserved. +SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal'; + restart_lsn | confirmed_flush_lsn +-------------+--------------------- + | +(1 row) + +-- Use this slot and check the WAL reservation again. +SELECT data FROM pg_logical_slot_get_changes('slot_nowal', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT restart_lsn is not null as valid_restart_lsn, confirmed_flush_lsn is not null as valid_confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal'; + valid_restart_lsn | valid_confirmed_flush_lsn +-------------------+--------------------------- + t | t +(1 row) + +SELECT pg_drop_replication_slot('slot_nowal'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 580e3ae3befa..3da5d6eab9b6 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -190,3 +190,15 @@ SELECT pg_drop_replication_slot('failover_true_slot'); SELECT pg_drop_replication_slot('failover_false_slot'); SELECT pg_drop_replication_slot('failover_default_slot'); SELECT pg_drop_replication_slot('physical_slot'); + +-- Test logical slots with not WAL reservation. +SELECT 'init' FROM pg_create_logical_replication_slot('slot_nowal', 'test_decoding', false, false, false, false); + +-- No WAL reserved. +SELECT restart_lsn, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal'; + +-- Use this slot and check the WAL reservation again. +SELECT data FROM pg_logical_slot_get_changes('slot_nowal', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT restart_lsn is not null as valid_restart_lsn, confirmed_flush_lsn is not null as valid_confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = 'slot_nowal'; + +SELECT pg_drop_replication_slot('slot_nowal'); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 47ffc0a23077..55071042341a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -78,6 +78,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/origin.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/walreceiver.h" @@ -141,6 +142,7 @@ bool XLOG_DEBUG = false; #endif int wal_segment_size = DEFAULT_XLOG_SEG_SIZE; +int effective_wal_level = WAL_LEVEL_REPLICA; /* * Number of WAL insertion locks to use. A higher value allows more insertions @@ -5011,6 +5013,48 @@ show_in_hot_standby(void) return RecoveryInProgress() ? "on" : "off"; } +/* + * GUC show_hook for effective_wal_level + */ +const char * +show_effective_wal_level(void) +{ + char *str; + + if (wal_level == WAL_LEVEL_MINIMAL) + return "minimal"; + + /* + * During the recovery, we synchronously update the XLogLogicalInfo so + * need to check the shared state. + */ + if (RecoveryInProgress()) + return IsXLogLogicalInfoEnabled() ? "logical" : "replica"; + + if (wal_level == WAL_LEVEL_REPLICA) + { + bool in_transition; + + /* check if the logical decoding status is being changed */ + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + in_transition = LogicalDecodingCtl->transition_in_progress; + LWLockRelease(LogicalDecodingControlLock); + + /* + * With wal_level='replica', XLogLogicalInfo indicates the actual WAL + * level unless we're in the status change. + */ + if (XLogLogicalInfo && !in_transition) + str = "logical"; + else + str = "replica"; + } + else + str = "logical"; + + return str; +} + /* * Read the control file, set respective GUCs. * @@ -5779,6 +5823,12 @@ StartupXLOG(void) */ RelationCacheInitFileRemove(); + /* + * Startup the logical decoding status, needs to be setup before + * initializing replication slots as it requires logical decoding status. + */ + StartupLogicalDecodingStatus(ControlFile->logicalDecodingEnabled); + /* * Initialize replication slots, before there's a chance to remove * required resources. @@ -6314,6 +6364,8 @@ StartupXLOG(void) Insert->fullPageWrites = lastFullPageWrites; UpdateFullPageWrites(); + UpdateLogicalDecodingStatusEndOfRecovery(); + /* * Emit checkpoint or end-of-recovery record in XLOG, if required. */ @@ -7453,6 +7505,8 @@ CreateCheckPoint(int flags) */ ControlFile->unloggedLSN = pg_atomic_read_membarrier_u64(&XLogCtl->unloggedLSN); + ControlFile->logicalDecodingEnabled = IsLogicalDecodingEnabled(); + UpdateControlFile(); LWLockRelease(ControlFileLock); @@ -8694,19 +8748,26 @@ xlog_redo(XLogReaderState *record) memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); /* - * Invalidate logical slots if we are in hot standby and the primary - * does not have a WAL level sufficient for logical decoding. No need - * to search for potentially conflicting logically slots if standby is - * running with wal_level lower than logical, because in that case, we - * would have either disallowed creation of logical slots or - * invalidated existing ones. + * Change the logical decoding status upon wal_level change on the + * primary server. */ - if (InRecovery && InHotStandby && - xlrec.wal_level < WAL_LEVEL_LOGICAL && - wal_level >= WAL_LEVEL_LOGICAL) - InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, - 0, InvalidOid, - InvalidTransactionId); + if (xlrec.wal_level == WAL_LEVEL_LOGICAL) + { + /* + * If the primary increase WAL level to 'logical', we can + * unconditionally enable the logical decoding on the standby. + */ + UpdateLogicalDecodingStatus(true); + } + else if (xlrec.wal_level == WAL_LEVEL_REPLICA && + pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) == 0) + { + /* + * Disable the logical decoding if there is no in-use logical slot + * on the standby. + */ + UpdateLogicalDecodingStatus(false); + } LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; @@ -8716,6 +8777,7 @@ xlog_redo(XLogReaderState *record) ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact; ControlFile->wal_level = xlrec.wal_level; ControlFile->wal_log_hints = xlrec.wal_log_hints; + ControlFile->logicalDecodingEnabled = IsLogicalDecodingEnabled(); /* * Update minRecoveryPoint to ensure that if recovery is aborted, we @@ -8775,6 +8837,27 @@ xlog_redo(XLogReaderState *record) { /* nothing to do here, just for informational purposes */ } + else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE) + { + bool logical_decoding; + + memcpy(&logical_decoding, XLogRecGetData(record), sizeof(bool)); + UpdateLogicalDecodingStatus(logical_decoding); + + /* + * Invalidate logical slots if we are in hot standby and the primary + * disabled the logical decoding. + */ + if (!logical_decoding && InRecovery && InHotStandby) + InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, + 0, InvalidOid, + InvalidTransactionId); + + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + ControlFile->logicalDecodingEnabled = logical_decoding; + UpdateControlFile(); + LWLockRelease(ControlFileLock); + } } /* diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 566f308e4439..5a801f5f3bb7 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -480,6 +480,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, IN failover boolean DEFAULT false, + IN immediately_reserve boolean DEFAULT true, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 0b23d94c38e2..75b2e2fdc872 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -38,6 +38,7 @@ #include "parser/parse_clause.h" #include "parser/parse_collate.h" #include "parser/parse_relation.h" +#include "replication/logicalctl.h" #include "rewrite/rewriteHandler.h" #include "storage/lmgr.h" #include "utils/acl.h" @@ -960,11 +961,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); - if (wal_level != WAL_LEVEL_LOGICAL) + if (!IsLogicalDecodingEnabled()) ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("\"wal_level\" is insufficient to publish logical changes"), - errhint("Set \"wal_level\" to \"logical\" before creating subscriptions."))); + errmsg("logical decoding needs to be enabled to publish logical changes"), + errhint("Set \"wal_level\" to \"logical\" or create a logical replication slot with \"replica\" \"wal_level\" before creating subscriptions."))); return myself; } diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 1e08bbbd4eb1..50ec127e9efc 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -20,6 +20,7 @@ OBJS = \ decode.o \ launcher.o \ logical.o \ + logicalctl.o \ logicalfuncs.o \ message.o \ origin.o \ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index cc03f0706e9c..269be167be54 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -150,21 +150,30 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ break; case XLOG_PARAMETER_CHANGE: + + /* + * XXX: even if wal_level on the primary got decreased to + * 'replica' it doesn't necessarily mean to disable the logical + * decoding as long as we have at least one logical slot. So we + * can ignore wal_level change here. + */ + break; + case XLOG_NOOP: + case XLOG_NEXTOID: + case XLOG_SWITCH: + case XLOG_BACKUP_END: + case XLOG_RESTORE_POINT: + case XLOG_FPW_CHANGE: + case XLOG_FPI_FOR_HINT: + case XLOG_FPI: + case XLOG_OVERWRITE_CONTRECORD: + case XLOG_CHECKPOINT_REDO: + break; + case XLOG_LOGICAL_DECODING_STATUS_CHANGE: { - xl_parameter_change *xlrec = - (xl_parameter_change *) XLogRecGetData(buf->record); + bool *logical_decoding = (bool *) XLogRecGetData(buf->record); - /* - * If wal_level on the primary is reduced to less than - * logical, we want to prevent existing logical slots from - * being used. Existing logical slots on the standby get - * invalidated when this WAL record is replayed; and further, - * slot creation fails when wal_level is not sufficient; but - * all these operations are not synchronized, so a logical - * slot may creep in while the wal_level is being reduced. - * Hence this extra check. - */ - if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + if (!(*logical_decoding)) { /* * This can occur only on a standby, as a primary would @@ -174,20 +183,9 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Assert(RecoveryInProgress()); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); + errmsg("logical decoding must be enabled on the primary"))); } - break; } - case XLOG_NOOP: - case XLOG_NEXTOID: - case XLOG_SWITCH: - case XLOG_BACKUP_END: - case XLOG_RESTORE_POINT: - case XLOG_FPW_CHANGE: - case XLOG_FPI_FOR_HINT: - case XLOG_FPI: - case XLOG_OVERWRITE_CONTRECORD: - case XLOG_CHECKPOINT_REDO: break; default: elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index f1eb798f3e97..0d7d9189de4c 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -36,6 +36,7 @@ #include "pgstat.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/reorderbuffer.h" #include "replication/slotsync.h" #include "replication/snapbuild.h" @@ -117,31 +118,24 @@ CheckLogicalDecodingRequirements(void) * needs the same check. */ - if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding requires \"wal_level\" >= \"logical\""))); + if (!IsLogicalDecodingEnabled()) + { + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding needs to be enabled on the primary"), + errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot on the primary ."))); + else + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding is not enabled"), + errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot."))); + } if (MyDatabaseId == InvalidOid) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - - if (RecoveryInProgress()) - { - /* - * This check may have race conditions, but whenever - * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we - * verify that there are no existing logical replication slots. And to - * avoid races around creating a new slot, - * CheckLogicalDecodingRequirements() is called once before creating - * the slot, and once when logical decoding is initially starting up. - */ - if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); - } } /* @@ -305,6 +299,40 @@ StartupDecodingContext(List *output_plugin_options, return ctx; } +/* + * Create the logical decoding and initialize it if necessary. This function + * can be used for logical slot that might not have been initialized yet. + */ +LogicalDecodingContext * +CreateOrInitDecodingContext(XLogRecPtr restart_lsn, + List *output_plugin_options, + bool fast_foward, + XLogReaderRoutine *xl_routine, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress) +{ + LogicalDecodingContext *ctx; + + /* Initialize the slot with a new logical decoding if not yet */ + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) + { + ctx = CreateInitDecodingContext(NameStr(MyReplicationSlot->data.plugin), + output_plugin_options, false, + restart_lsn, xl_routine, + prepare_write, do_write, update_progress); + + DecodingContextFindStartpoint(ctx); + + FreeDecodingContext(ctx); + } + + ctx = CreateDecodingContext(restart_lsn, output_plugin_options, fast_foward, + xl_routine, prepare_write, do_write, update_progress); + + return ctx; +} + /* * Create a new decoding context, for a new logical slot. * diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c new file mode 100644 index 000000000000..b5f059329a23 --- /dev/null +++ b/src/backend/replication/logical/logicalctl.c @@ -0,0 +1,401 @@ +/*------------------------------------------------------------------------- + * logicalctl.c + * Functionality to control logical decoding status. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/replication/logical/logicalctl.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/xlog_internal.h" +#include "access/xlogutils.h" +#include "access/xloginsert.h" +#include "catalog/pg_control.h" +#include "port/atomics.h" +#include "miscadmin.h" +#include "storage/lwlock.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/shmem.h" +#include "storage/standby.h" +#include "replication/logicalctl.h" +#include "replication/slot.h" +#include "utils/guc.h" +#include "utils/wait_event_types.h" + +LogicalDecodingCtlData *LogicalDecodingCtl = NULL; + +/* + * Process local cache of LogicalDecodingCtl->xlog_logical_info. This is + * initialized at process startup time, and could be updated when absorbing + * process barrier in ProcessBarrierUpdateXLogLogicalInfo(). + */ +bool XLogLogicalInfo = false; + +Size +LogicalDecodingCtlShmemSize(void) +{ + return sizeof(LogicalDecodingCtlData); +} + +void +LogicalDecodingCtlShmemInit(void) +{ + bool found; + + LogicalDecodingCtl = ShmemInitStruct("Logical information control", + LogicalDecodingCtlShmemSize(), + &found); + + if (!found) + { + LogicalDecodingCtl->transition_in_progress = false; + ConditionVariableInit(&LogicalDecodingCtl->transition_cv); + pg_atomic_init_flag(&LogicalDecodingCtl->xlog_logical_info); + pg_atomic_init_flag(&LogicalDecodingCtl->logical_decoding_enabled); + } +} + +/* + * Initialize the logical decoding status on shmem at server startup. This + * must be called ONCE during postmaster or standalone-backend startup, + * before initializing replication slots. + */ +void +StartupLogicalDecodingStatus(bool status_in_control_file) +{ + if (wal_level == WAL_LEVEL_MINIMAL) + return; + + /* + * If the logical decoding was enabled before the last shutdown, we + * continue enabling it as we might have set wal_level='logical' or have a + * few logical slots. On primary, wal_level setting can overwrite the + * status. + */ + if (status_in_control_file || + (!StandbyMode && wal_level >= WAL_LEVEL_LOGICAL)) + { + pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info)); + } +} + +/* + * Update the XLogLogicalInfo cache. + */ +static void +update_xlog_logical_info(void) +{ + XLogLogicalInfo = IsXLogLogicalInfoEnabled(); +} + +/* + * Initialize XLogLogicalInfo backend-private cache. + */ +void +InitializeProcessXLogLogicalInfo(void) +{ + update_xlog_logical_info(); +} + +bool +ProcessBarrierUpdateXLogLogicalInfo(void) +{ + update_xlog_logical_info(); + return true; +} + +/* + * Check the shared memory state and return true if the logical decoding is + * enabled on the system. + */ +bool +IsLogicalDecodingEnabled(void) +{ + return !pg_atomic_unlocked_test_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); +} + +/* + * Check the shared memory state and return true if WAL logging logical + * information is enabled. + */ +bool +IsXLogLogicalInfoEnabled(void) +{ + return !pg_atomic_unlocked_test_flag(&(LogicalDecodingCtl->xlog_logical_info)); +} + +/* + * Enable/Disable both status of logical info WAL logging and logical decoding + * on shared memory based on the given new status. + */ + +void +UpdateLogicalDecodingStatus(bool new_status) +{ + if (new_status) + { + pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info)); + pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + } + else + { + pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info)); + } + + elog(DEBUG1, "update logical decoding status to %d", new_status); +} + +/* + * A PG_ENSURE_ERROR_CLEANUP callback for making the logical decoding enabled. + */ +static void +abort_enabling_logical_decoding(int code, Datum arg) +{ + Assert(LogicalDecodingCtl->transition_in_progress); + + elog(DEBUG1, "aborting the process of enabling logical decoding"); + + pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info)); + + /* XXX really no need to wait here? */ + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->transition_in_progress = false; + LWLockRelease(LogicalDecodingControlLock); + + /* Let waiters know the WAL level change completed */ + ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv); +} + +/* + * Enable the logical decoding if disabled. + */ +void +EnsureLogicalDecodingEnabled(void) +{ + if (IsLogicalDecodingEnabled()) + return; + + /* Standby cannot change the logical decoding status */ + if (RecoveryInProgress()) + return; + +retry: + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + if (LogicalDecodingCtl->transition_in_progress) + { + LWLockRelease(LogicalDecodingControlLock); + + /* Wait for someone to complete the transition */ + ConditionVariableSleep(&LogicalDecodingCtl->transition_cv, + WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE); + + goto retry; + } + + if (IsLogicalDecodingEnabled()) + { + LWLockRelease(LogicalDecodingControlLock); + return; + } + + LogicalDecodingCtl->transition_in_progress = true; + LWLockRelease(LogicalDecodingControlLock); + + PG_ENSURE_ERROR_CLEANUP(abort_enabling_logical_decoding, (Datum) 0); + { + RunningTransactions running; + + /* + * Set logical info WAL logging on the shmem. All process starts after + * this point will include the information required by the logical + * decoding to WAL records. + */ + pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info)); + + running = GetRunningTransactionData(); + LWLockRelease(ProcArrayLock); + LWLockRelease(XidGenLock); + + /* + * Order all running processes to reflect the xlog_logical_info + * update, and wait. + */ + WaitForProcSignalBarrier( + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO)); + + /* + * Wait for all running transactions to finish as some transaction + * might have started with the old state. + */ + for (int i = 0; i < running->xcnt; i++) + { + TransactionId xid = running->xids[i]; + + if (TransactionIdIsCurrentTransactionId(xid)) + continue; + + XactLockTableWait(xid, NULL, NULL, XLTW_None); + } + + /* + * Here, we can ensure that all running transactions are using the new + * xlog_logical_info value, writing logical information to WAL + * records. So now enable the logical decoding globally. + */ + pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->transition_in_progress = false; + LWLockRelease(LogicalDecodingControlLock); + } + PG_END_ENSURE_ERROR_CLEANUP(abort_enabling_logical_decoding, (Datum) 0); + + /* Let waiters know the work finished */ + ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv); + + if (XLogStandbyInfoActive() && !RecoveryInProgress()) + { + XLogRecPtr recptr; + bool logical_decoding = true; + + XLogBeginInsert(); + XLogRegisterData(&logical_decoding, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); + } + + ereport(LOG, + (errmsg("logical decoding is enabled upon creating a new logical replication slot"))); +} + +/* + * Disable the logical decoding if enabled. + * + * XXX: This function could write a WAL record in order to tell the standbys + * know the logical decoding got disabled. However, we need to note that this + * function could be called during process exits (e.g., by ReplicationSlotCleanup() + * via before_shmem_exit callbacks), which looks something that we want to + * avoid. + */ +void +DisableLogicalDecodingIfNecessary(void) +{ + /* Standby cannot change the logical decoding status */ + if (RecoveryInProgress()) + return; + + if (wal_level >= WAL_LEVEL_LOGICAL || !IsLogicalDecodingEnabled()) + return; + + if (pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) > 0) + return; + + if (XLogStandbyInfoActive() && !RecoveryInProgress()) + { + bool logical_decoding = false; + XLogRecPtr recptr; + + XLogBeginInsert(); + XLogRegisterData(&logical_decoding, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); + } + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->transition_in_progress = true; + LWLockRelease(LogicalDecodingControlLock); + + pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info)); + + /* + * XXX is it okay not to wait for the signal to be absorbed? + */ + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); + + /* XXX need to wait for transaction finishes? */ + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->transition_in_progress = false; + LWLockRelease(LogicalDecodingControlLock); + + /* Let waiters know the work finished */ + ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv); + + ereport(LOG, + (errmsg("logical decoding is disabled because all logical replication slots are removed"))); +} + +/* + * Update the logical decoding status at end of the recovery. This function + * must be called ONCE before accepting writes. + */ +void +UpdateLogicalDecodingStatusEndOfRecovery(void) +{ + bool new_status = false; + + if (!IsUnderPostmaster) + return; + + if (wal_level == WAL_LEVEL_MINIMAL) + return; + +#ifdef USE_ASSERT_CHECKING + { + bool xlog_logical_info = IsXLogLogicalInfoEnabled(); + bool logical_decoding = IsLogicalDecodingEnabled(); + + /* Verify we're not in intermediate status */ + Assert((xlog_logical_info && logical_decoding) || + (!xlog_logical_info && !logical_decoding)); + } +#endif + + /* + * We can use logical decoding if we're using 'logical' WAL level or there + * is at least one logical replication slot. + */ + if (wal_level == WAL_LEVEL_LOGICAL || + pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) > 0) + new_status = true; + + /* Update the status on shmem if needed */ + if (IsLogicalDecodingEnabled() != new_status) + { + XLogRecPtr recptr; + + UpdateLogicalDecodingStatus(new_status); + + XLogBeginInsert(); + XLogRegisterData(&new_status, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); + + elog(DEBUG1, "update logical decoding status to %d at end of recovery", + new_status); + } + + /* + * Ensure all running processes have the updated status. We don't need to + * wait for running transactions to finish as we don't accept any writes + * yet. We need the wait even if we've not updated the status above as the + * status have been turned on and off during recovery, having running + * processes have different status on their local caches. + */ + WaitForProcSignalBarrier( + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO)); +} diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index ca53caac2f2f..8469280a37b9 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -202,14 +202,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin PG_TRY(); { /* restart at slot's confirmed_flush */ - ctx = CreateDecodingContext(InvalidXLogRecPtr, - options, - false, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), - LogicalOutputPrepareWrite, - LogicalOutputWrite, NULL); + ctx = CreateOrInitDecodingContext(InvalidXLogRecPtr, + options, + false, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + LogicalOutputPrepareWrite, + LogicalOutputWrite, NULL); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 6f19614c79d8..19c7130b961c 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -6,6 +6,7 @@ backend_sources += files( 'decode.c', 'launcher.c', 'logical.c', + 'logicalctl.c', 'logicalfuncs.c', 'message.c', 'origin.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index f1dcbebfa1ae..8f13b80ac044 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -57,6 +57,7 @@ #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/snapbuild.h" #include "storage/ipc.h" @@ -1058,15 +1059,16 @@ bool ValidateSlotSyncParams(int elevel) { /* - * Logical slot sync/creation requires wal_level >= logical. + * Logical slot sync/creation requires to the logical decoding to be + * enabled. * * Since altering the wal_level requires a server restart, so error out in * this case regardless of elevel provided by caller. */ - if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\"")); + if (!IsLogicalDecodingEnabled()) + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding is not enabled")); /* * A physical replication slot(primary_slot_name) is required on the diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c11e588d6322..68692d8d7987 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -219,6 +220,8 @@ ReplicationSlotsShmemInit(void) /* First time through, so initialize */ MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize()); + pg_atomic_init_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 0); + for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; @@ -459,7 +462,10 @@ ReplicationSlotCreate(const char *name, bool db_specific, * ReplicationSlotAllocationLock. */ if (SlotIsLogical(slot)) + { pgstat_create_replslot(slot); + pg_atomic_add_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1); + } /* * Now that the slot has been marked as in_use and active, it's safe to @@ -806,6 +812,8 @@ ReplicationSlotCleanup(bool synced_only) } LWLockRelease(ReplicationSlotControlLock); + + DisableLogicalDecodingIfNecessary(); } /* @@ -920,6 +928,7 @@ void ReplicationSlotDropAcquired(void) { ReplicationSlot *slot = MyReplicationSlot; + bool was_logical_slot = SlotIsLogical(slot); Assert(MyReplicationSlot != NULL); @@ -927,6 +936,9 @@ ReplicationSlotDropAcquired(void) MyReplicationSlot = NULL; ReplicationSlotDropPtr(slot); + + if (was_logical_slot) + DisableLogicalDecodingIfNecessary(); } /* @@ -1027,7 +1039,10 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * another session. */ if (SlotIsLogical(slot)) + { pgstat_drop_replslot(slot); + pg_atomic_sub_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1); + } /* * We release this at the very end, so that nobody starts trying to create @@ -2577,12 +2592,12 @@ RestoreSlotFromDisk(const char *name) */ if (cp.slotdata.database != InvalidOid) { - if (wal_level < WAL_LEVEL_LOGICAL) + if (!IsLogicalDecodingEnabled()) ereport(FATAL, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"", + errmsg("logical replication slot \"%s\" exists, but logical decoding is not enabled", NameStr(cp.slotdata.name)), - errhint("Change \"wal_level\" to be \"logical\" or higher."))); + errhint("Change \"wal_level\" to be \"replica\" or higher."))); /* * In standby mode, the hot standby must be enabled. This check is @@ -2645,6 +2660,9 @@ RestoreSlotFromDisk(const char *name) ReplicationSlotSetInactiveSince(slot, now, false); restored = true; + + if (SlotIsLogical(slot)) + pg_atomic_add_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1); break; } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 36cc2ed4e440..d4f45be793d8 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -18,6 +18,7 @@ #include "access/xlogutils.h" #include "funcapi.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "utils/builtins.h" @@ -116,13 +117,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, - bool failover, + bool failover, bool immediately_reserve, XLogRecPtr restart_lsn, bool find_startpoint) { LogicalDecodingContext *ctx = NULL; Assert(!MyReplicationSlot); + Assert(plugin != NULL); /* * Acquire a logical decoding slot, this will check for conflicting names. @@ -136,30 +138,55 @@ create_logical_replication_slot(char *name, char *plugin, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, failover, false); - /* - * Create logical decoding context to find start point or, if we don't - * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity. - * - * Note: when !find_startpoint this is still important, because it's at - * this point that the output plugin is validated. - */ - ctx = CreateInitDecodingContext(plugin, NIL, - false, /* just catalogs is OK */ - restart_lsn, - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), - NULL, NULL, NULL); + if (immediately_reserve) + { + /* + * Create logical decoding context to find start point or, if we don't + * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin + * sanity. + * + * Note: when !find_startpoint this is still important, because it's + * at this point that the output plugin is validated. + */ + ctx = CreateInitDecodingContext(plugin, NIL, + false, /* just catalogs is OK */ + restart_lsn, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); - /* - * If caller needs us to determine the decoding start point, do so now. - * This might take a while. - */ - if (find_startpoint) - DecodingContextFindStartpoint(ctx); + /* + * If caller needs us to determine the decoding start point, do so + * now. This might take a while. + */ + if (find_startpoint) + DecodingContextFindStartpoint(ctx); + + /* don't need the decoding context anymore */ + FreeDecodingContext(ctx); + } + else + { + NameData plugin_name; + + /* + * On a standby, this check is also required while creating the slot. + * Check the comments in the function. + */ + CheckLogicalDecodingRequirements(); - /* don't need the decoding context anymore */ - FreeDecodingContext(ctx); + /* + * Register output plugin name with slot. We need the mutex to avoid + * concurrent reading of a partially copied string. But we don't want + * any complicated code while holding a spinlock, so do namestrcpy() + * outside. + */ + namestrcpy(&plugin_name, plugin); + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.plugin = plugin_name; + SpinLockRelease(&MyReplicationSlot->mutex); + } } /* @@ -173,6 +200,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); bool failover = PG_GETARG_BOOL(4); + bool immediately_reserve = PG_GETARG_BOOL(5); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -184,6 +212,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) CheckSlotPermissions(); + EnsureLogicalDecodingEnabled(); CheckLogicalDecodingRequirements(); create_logical_replication_slot(NameStr(*name), @@ -191,6 +220,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) temporary, two_phase, failover, + immediately_reserve, InvalidXLogRecPtr, true); @@ -726,6 +756,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, false, + true, src_restart_lsn, false); } @@ -905,6 +936,7 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS) errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slots can only be synchronized to a standby server")); + EnsureLogicalDecodingEnabled(); ValidateSlotSyncParams(ERROR); /* Load the libpq-specific functions */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f2c33250e8b2..2af61f4b8ba9 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -71,6 +71,7 @@ #include "postmaster/interrupt.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/snapbuild.h" @@ -1218,6 +1219,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(cmd->kind == REPLICATION_KIND_LOGICAL); + EnsureLogicalDecodingEnabled(); CheckLogicalDecodingRequirements(); /* @@ -1462,12 +1464,12 @@ StartLogicalReplication(StartReplicationCmd *cmd) * are reported early. */ logical_decoding_ctx = - CreateDecodingContext(cmd->startpoint, cmd->options, false, - XL_ROUTINE(.page_read = logical_read_xlog_page, - .segment_open = WalSndSegmentOpen, - .segment_close = wal_segment_close), - WalSndPrepareWrite, WalSndWriteData, - WalSndUpdateProgress); + CreateOrInitDecodingContext(cmd->startpoint, cmd->options, false, + XL_ROUTINE(.page_read = logical_read_xlog_page, + .segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + WalSndPrepareWrite, WalSndWriteData, + WalSndUpdateProgress); xlogreader = logical_decoding_ctx->reader; WalSndSetState(WALSNDSTATE_CATCHUP); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2fa045e6b0f6..f1ef837755c2 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -31,6 +31,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/walsummarizer.h" +#include "replication/logicalctl.h" #include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/slot.h" @@ -150,6 +151,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, InjectionPointShmemSize()); size = add_size(size, SlotSyncShmemSize()); size = add_size(size, AioShmemSize()); + size = add_size(size, LogicalDecodingCtlShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -343,6 +345,7 @@ CreateOrAttachShmemStructs(void) WaitEventCustomShmemInit(); InjectionPointShmemInit(); AioShmemInit(); + LogicalDecodingCtlShmemInit(); } /* diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index a9bb540b55ac..0c3788cb8364 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -22,6 +22,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "port/pg_bitutils.h" +#include "replication/logicalctl.h" #include "replication/logicalworker.h" #include "replication/walsender.h" #include "storage/condition_variable.h" @@ -576,6 +577,9 @@ ProcessProcSignalBarrier(void) case PROCSIGNAL_BARRIER_SMGRRELEASE: processed = ProcessBarrierSmgrRelease(); break; + case PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO: + processed = ProcessBarrierUpdateXLogLogicalInfo(); + break; } /* diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 7fa8d9247e09..c34403c6e8cd 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,6 +24,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/proc.h" @@ -499,7 +500,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * seems OK, given that this kind of conflict should not normally be * reached, e.g. due to using a physical replication slot. */ - if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) + if (IsLogicalDecodingEnabled() && isCatalogRel) InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); } @@ -1325,13 +1326,13 @@ LogStandbySnapshot(void) * record. Fortunately this routine isn't executed frequently, and it's * only a shared lock. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!IsLogicalDecodingEnabled()) LWLockRelease(ProcArrayLock); recptr = LogCurrentRunningXacts(running); /* Release lock if we kept it longer ... */ - if (wal_level >= WAL_LEVEL_LOGICAL) + if (IsLogicalDecodingEnabled()) LWLockRelease(ProcArrayLock); /* GetRunningTransactionData() acquired XidGenLock, we must release it */ diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4da68312b5f9..311fed2861be 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -133,6 +133,7 @@ HASH_GROW_BUCKETS_ELECT "Waiting to elect a Parallel Hash participant to allocat HASH_GROW_BUCKETS_REALLOCATE "Waiting for an elected Parallel Hash participant to finish allocating more buckets." HASH_GROW_BUCKETS_REINSERT "Waiting for other Parallel Hash participants to finish inserting tuples into new buckets." LOGICAL_APPLY_SEND_DATA "Waiting for a logical replication leader apply process to send data to a parallel apply process." +LOGICAL_DECODING_STATUS_CHANGE "Waiting for the logical decoding status change." LOGICAL_PARALLEL_APPLY_STATE_CHANGE "Waiting for a logical replication parallel apply process to change state." LOGICAL_SYNC_DATA "Waiting for a logical replication remote server to send data for initial table synchronization." LOGICAL_SYNC_STATE_CHANGE "Waiting for a logical replication remote server to change state." @@ -352,6 +353,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared pg_serial state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +LogicalDecodingControl "Waiting to access logical decoding status information." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index c86ceefda940..87bceae34067 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -40,6 +40,7 @@ #include "pgstat.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/walsender.h" @@ -658,6 +659,9 @@ BaseInit(void) /* Initialize lock manager's local structs */ InitLockManagerAccess(); + /* Initialize logical info WAL logging state */ + InitializeProcessXLogLogicalInfo(); + /* * Initialize replication slots after pgstat. The exit hook might need to * drop ephemeral slots, which in turn triggers stats reporting. diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f04bfedb2fd1..dfd92336106f 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -5234,6 +5234,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"effective_wal_level", PGC_INTERNAL, PRESET_OPTIONS, + gettext_noop("Show the effective WAL level."), + NULL, + GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE + }, + &effective_wal_level, + WAL_LEVEL_REPLICA, wal_level_options, + NULL, NULL, show_effective_wal_level + }, + { {"dynamic_shared_memory_type", PGC_POSTMASTER, RESOURCES_MEM, gettext_noop("Selects the dynamic shared memory implementation used."), diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index d313099c027f..08ec587ef770 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -94,6 +94,9 @@ typedef enum RecoveryState } RecoveryState; extern PGDLLIMPORT int wal_level; +extern PGDLLEXPORT int effective_wal_level; + +extern bool XLogLogicalInfo; /* Is WAL archiving enabled (always or only while server is running normally)? */ #define XLogArchivingActive() \ @@ -123,7 +126,7 @@ extern PGDLLIMPORT int wal_level; #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) /* Do we need to WAL-log information required only for logical replication? */ -#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) +#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL || XLogLogicalInfo) #ifdef WAL_DEBUG extern PGDLLIMPORT bool XLOG_DEBUG; diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 63e834a6ce47..5e9b44c82f01 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -80,6 +80,7 @@ typedef struct CheckPoint /* 0xC0 is used in Postgres 9.5-11 */ #define XLOG_OVERWRITE_CONTRECORD 0xD0 #define XLOG_CHECKPOINT_REDO 0xE0 +#define XLOG_LOGICAL_DECODING_STATUS_CHANGE 0xF0 /* @@ -136,6 +137,8 @@ typedef struct ControlFileData XLogRecPtr unloggedLSN; /* current fake LSN value, for unlogged rels */ + bool logicalDecodingEnabled; + /* * These two values determine the minimum point we must recover up to * before starting up: diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d3d28a263fa9..c5ef5a1a4cfa 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11470,10 +11470,10 @@ { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool bool', - proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', + proargtypes => 'name name bool bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,immediately_reserve,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 2e562bee5a9c..481ea53e7e14 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -132,6 +132,13 @@ extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress); +extern LogicalDecodingContext *CreateOrInitDecodingContext(XLogRecPtr restart_lsn, + List *output_plugin_options, + bool fast_foward, + XLogReaderRoutine *xl_routine, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write, + LogicalOutputPluginWriterUpdateProgress update_progress); extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h new file mode 100644 index 000000000000..e5837a5e778a --- /dev/null +++ b/src/include/replication/logicalctl.h @@ -0,0 +1,59 @@ +/*------------------------------------------------------------------------- + * + * logicalctl.h + * Definitions for logical decoding status control facility. + * + * Portions Copyright (c) 2013-2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/replication/logicalctl.h + * + *------------------------------------------------------------------------- + */ +#ifndef XLOGLEVEL_H +#define XLOGLEVEL_H + +#include "access/xlog.h" +#include "port/atomics.h" +#include "storage/condition_variable.h" + +typedef struct LogicalDecodingCtlData +{ + /* True while the logical decoding status is being changed */ + bool transition_in_progress; + + /* Condition variable signaled when a transition completes */ + ConditionVariable transition_cv; + + /* + * xlog_logical_info is the authoritative value used by the all process to + * determine whether to write additional information required by logical + * decoding to WAL. Since this information could be checked frequently, + * each process caches this value in XLogLogicalInfo for better + * performance. + * + * logical_decoding_enabled is true if we allow creating logical slots and + * the logical decoding is enabled. + * + * Both fields are initialized at server startup time. On standbys, these + * values are synchronized to the primary's values when replaying the + * XLOG_LOGICAL_DECODING_STATUS_CHANGE record. + */ + pg_atomic_flag xlog_logical_info; + pg_atomic_flag logical_decoding_enabled; +} LogicalDecodingCtlData; +extern LogicalDecodingCtlData * LogicalDecodingCtl; + +extern Size LogicalDecodingCtlShmemSize(void); +extern void LogicalDecodingCtlShmemInit(void); +extern void StartupLogicalDecodingStatus(bool status_in_control_file); +extern void InitializeProcessXLogLogicalInfo(void); +extern bool ProcessBarrierUpdateXLogLogicalInfo(void); +extern bool IsLogicalDecodingEnabled(void); +extern bool IsXLogLogicalInfoEnabled(void); +extern void EnsureLogicalDecodingEnabled(void); +extern void DisableLogicalDecodingIfNecessary(void); +extern void UpdateLogicalDecodingStatus(bool new_status); +extern void UpdateLogicalDecodingStatusEndOfRecovery(void); + +#endif diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index ffacba9d2ae5..4ad2b1fb4719 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,6 +15,7 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" +#include "port/atomics.h" #include "replication/walreceiver.h" /* directory to store replication slot data in */ @@ -233,6 +234,8 @@ typedef struct ReplicationSlot */ typedef struct ReplicationSlotCtlData { + pg_atomic_uint32 n_inuse_logical_slots; + /* * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some * reason you can't do that in an otherwise-empty struct. diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index a9681738146e..4d78367878b1 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, LogicalDecodingControl) diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index afeeb1ca019f..8e428f298c66 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -54,6 +54,8 @@ typedef enum typedef enum { PROCSIGNAL_BARRIER_SMGRRELEASE, /* ask smgr to close files */ + PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, /* ask to update + * XLogLogicalInfo */ } ProcSignalBarrierType; /* diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 799fa7ace684..d6f3f4cdf098 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -61,6 +61,7 @@ extern bool check_default_text_search_config(char **newval, void **extra, GucSou extern void assign_default_text_search_config(const char *newval, void *extra); extern bool check_default_with_oids(bool *newval, void **extra, GucSource source); +extern const char *show_effective_wal_level(void); extern bool check_huge_page_size(int *newval, void **extra, GucSource source); extern void assign_io_method(int newval, void *extra); extern bool check_io_max_concurrency(int *newval, void **extra, GucSource source); diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index 921813483e37..344e6b2f5c9e 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -876,7 +876,7 @@ sub wait_until_vacuum_can_remove make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr); # We are not able to read from the slot as it requires wal_level >= logical on the primary server check_pg_recvlogical_stderr($handle, - "logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary" + "logical decoding needs to be enabled on the primary" ); # Restore primary wal_level diff --git a/src/test/recovery/t/046_effective_wal_level.pl b/src/test/recovery/t/046_effective_wal_level.pl new file mode 100644 index 000000000000..89df52464047 --- /dev/null +++ b/src/test/recovery/t/046_effective_wal_level.pl @@ -0,0 +1,170 @@ + +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init( + allows_streaming => 1 + ); +$primary->append_conf('postgresql.conf', "log_min_messages = debug1"); +$primary->start(); + +# Check both wal_level and effective_wal_level values. +is( $primary->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|replica", + "wal_level and effective_wal_level starts with the same value 'replica'"); + +$primary->safe_psql('postgres', + qq[select pg_create_physical_replication_slot('test_phy_slot', false, false)]); +is( $primary->safe_psql('postgres', qq[show effective_wal_level]), + "replica", + "effective_wal_level doesn't change with a new physical slot"); + +# Create a new logical slot +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); + +# effective_wal_level must be bumped to 'logical' +is( $primary->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level bumped to logical upon logical slot creation"); + +# restart the server and check again. +$primary->restart(); +is( $primary->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level becomes logical during startup"); + +# Take backup during the effective_wal_level being 'logical'. +$primary->backup('my_backup'); + +# Initialize standby1 node from the backup 'my_backup'. Note that the +# backup was taken during the logical decoding begin enabled on the +# primary because of one logical slot, but replication slots are not +# included in the basebackup. +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup($primary, 'my_backup', + has_streaming => 1); +$standby1->set_standby_mode(); +$standby1->start; +is( $standby1->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level='logical' on standby"); + +# Promote the standby1 node that doesn't have any logical slot. So +# the logical decoding must be disabled at promotion. +$standby1->promote; +is( $standby1->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|replica", + "effective_wal_level got decrased to 'replica' during promotion"); +$standby1->stop; + +# Initialize standby2 ndoe form the backup 'my_backup'. +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($primary, 'my_backup', + has_streaming => 1); +$standby2->set_standby_mode(); +$standby2->start; + +# Create a logical slot on the standby, which should be succeeded +# as the primary enables it. +$standby2->create_logical_slot_on_standby($primary, 'standby2_slot', 'postgres'); + +# Promote the standby2 node that has one logical slot. So the logical decoding +# keeps enabled even after the promotion. +$standby2->promote; +is( $standby2->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level keeps 'logical' even after the promotion"); +$standby2->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('standby2_slot2', 'pgoutput')]); +$standby2->stop; + +# Initialize standby3 which uses 'logical' WAL level. +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +$standby3->init_from_backup($primary, 'my_backup', + has_streaming => 1); +$standby3->set_standby_mode(); +$standby3->append_conf('postgresql.conf', + qq[wal_level = 'logical']); +$standby3->start(); +$standby3->backup('my_backup3'); + +# Initialize cascade standby, which uses 'replica' WAL level. +my $cascade = PostgreSQL::Test::Cluster->new('cascade'); +$cascade->init_from_backup($standby3, 'my_backup3', + has_streaming => 1); +$cascade->adjust_conf('postgresql.conf', 'wal_level', 'replica'); +$cascade->set_standby_mode(); +$cascade->start(); + +# Regardless of their wal_level values, effective_wal_level values on the +# standby and the cascaded standby depend on the primary's value, 'logical'. +is( $standby3->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "logical|logical", + "check wal_level and effective_wal_level on standby"); +is( $cascade->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "check wal_level and effective_wal_level on cascaded standby"); + +# Drop the primary's last logical slot, disabling the logical decoding. +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); + +$primary->wait_for_replay_catchup($standby3); +$standby3->wait_for_replay_catchup($cascade, $primary); + +is( $primary->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|replica", + "effective_wal_level got decreased to 'replica' on primary"); +is( $standby3->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "logical|replica", + "effective_wal_level got decreased to 'replica' on standby"); +is( $cascade->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|replica", + "effective_wal_level got decreased to 'logical' on standby"); + +# Promote standby3. It enables the logical decoding at promotion as it uses +# 'logical' WAL level. +$standby3->promote; +$standby3->wait_for_replay_catchup($cascade); + +is( $cascade->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level got increased to 'logical' on standby"); + +$standby3->stop; +$cascade->stop; + + +$primary->stop; +done_testing(); diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 916fdb48b3b3..51f102f0c9fb 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -589,7 +589,7 @@ ROLLBACK; }); ok( $reterr =~ - m/WARNING: "wal_level" is insufficient to publish logical changes/, + m/WARNING: logical decoding needs to be enabled to publish logical changes/, 'CREATE PUBLICATION while "wal_level=minimal"'); done_testing();