#include "postmaster/syslogger.h"
#include "postmaster/walsummarizer.h"
#include "replication/logicallauncher.h"
+#include "replication/slotsync.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
* they will never become live backends. dead_end children are not assigned a
* PMChildSlot. dead_end children have bkend_type NORMAL.
*
- * "Special" children such as the startup, bgwriter and autovacuum launcher
- * tasks are not in this list. They are tracked via StartupPID and other
- * pid_t variables below. (Thus, there can't be more than one of any given
- * "special" child process type. We use BackendList entries for any child
- * process there can be more than one of.)
+ * "Special" children such as the startup, bgwriter, autovacuum launcher, and
+ * slot sync worker tasks are not in this list. They are tracked via StartupPID
+ * and other pid_t variables below. (Thus, there can't be more than one of any
+ * given "special" child process type. We use BackendList entries for any
+ * child process there can be more than one of.)
*/
typedef struct bkend
{
WalSummarizerPID = 0,
AutoVacPID = 0,
PgArchPID = 0,
- SysLoggerPID = 0;
+ SysLoggerPID = 0,
+ SlotSyncWorkerPID = 0;
/* Startup process's status */
typedef enum
static void MaybeStartWalReceiver(void);
static void MaybeStartWalSummarizer(void);
static void InitPostmasterDeathWatchHandle(void);
+static void MaybeStartSlotSyncWorker(void);
/*
* Archiver is allowed to start up at the current postmaster state?
if (PgArchPID == 0 && PgArchStartupAllowed())
PgArchPID = StartChildProcess(ArchiverProcess);
+ /* If we need to start a slot sync worker, try to do that now */
+ MaybeStartSlotSyncWorker();
+
/* If we need to signal the autovacuum launcher, do so now */
if (avlauncher_needs_signal)
{
signal_child(PgArchPID, SIGHUP);
if (SysLoggerPID != 0)
signal_child(SysLoggerPID, SIGHUP);
+ if (SlotSyncWorkerPID != 0)
+ signal_child(SlotSyncWorkerPID, SIGHUP);
/* Reload authentication config files too */
if (!load_hba())
AutoVacPID = StartAutoVacLauncher();
if (PgArchStartupAllowed() && PgArchPID == 0)
PgArchPID = StartChildProcess(ArchiverProcess);
+ MaybeStartSlotSyncWorker();
/* workers may be scheduled to start now */
maybe_start_bgworkers();
continue;
}
+ /*
+ * Was it the slot sync worker? Normal exit or FATAL exit can be
+ * ignored (FATAL can be caused by libpqwalreceiver on receiving
+ * shutdown request by the startup process during promotion); we'll
+ * start a new one at the next iteration of the postmaster's main
+ * loop, if necessary. Any other exit condition is treated as a crash.
+ */
+ if (pid == SlotSyncWorkerPID)
+ {
+ SlotSyncWorkerPID = 0;
+ if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus))
+ HandleChildCrash(pid, exitstatus,
+ _("slot sync worker process"));
+ continue;
+ }
+
/* Was it one of our background workers? */
if (CleanupBackgroundWorker(pid, exitstatus))
{
/*
* HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer,
- * walwriter, autovacuum, archiver or background worker.
+ * walwriter, autovacuum, archiver, slot sync worker, or background worker.
*
* The objectives here are to clean up our local state about the child
* process, and to signal all other remaining children to quickdie.
else if (PgArchPID != 0 && take_action)
sigquit_child(PgArchPID);
+ /* Take care of the slot sync worker too */
+ if (pid == SlotSyncWorkerPID)
+ SlotSyncWorkerPID = 0;
+ else if (SlotSyncWorkerPID != 0 && take_action)
+ sigquit_child(SlotSyncWorkerPID);
+
/* We do NOT restart the syslogger */
if (Shutdown != ImmediateShutdown)
signal_child(WalReceiverPID, SIGTERM);
if (WalSummarizerPID != 0)
signal_child(WalSummarizerPID, SIGTERM);
+ if (SlotSyncWorkerPID != 0)
+ signal_child(SlotSyncWorkerPID, SIGTERM);
/* checkpointer, archiver, stats, and syslogger may continue for now */
/* Now transition to PM_WAIT_BACKENDS state to wait for them to die */
/*
* PM_WAIT_BACKENDS state ends when we have no regular backends
* (including autovac workers), no bgworkers (including unconnected
- * ones), and no walwriter, autovac launcher or bgwriter. If we are
- * doing crash recovery or an immediate shutdown then we expect the
- * checkpointer to exit as well, otherwise not. The stats and
- * syslogger processes are disregarded since they are not connected to
- * shared memory; we also disregard dead_end children here. Walsenders
- * and archiver are also disregarded, they will be terminated later
- * after writing the checkpoint record.
+ * ones), and no walwriter, autovac launcher, bgwriter or slot sync
+ * worker. If we are doing crash recovery or an immediate shutdown
+ * then we expect the checkpointer to exit as well, otherwise not. The
+ * stats and syslogger processes are disregarded since they are not
+ * connected to shared memory; we also disregard dead_end children
+ * here. Walsenders and archiver are also disregarded, they will be
+ * terminated later after writing the checkpoint record.
*/
if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 &&
StartupPID == 0 &&
(CheckpointerPID == 0 ||
(!FatalError && Shutdown < ImmediateShutdown)) &&
WalWriterPID == 0 &&
- AutoVacPID == 0)
+ AutoVacPID == 0 &&
+ SlotSyncWorkerPID == 0)
{
if (Shutdown >= ImmediateShutdown || FatalError)
{
Assert(CheckpointerPID == 0);
Assert(WalWriterPID == 0);
Assert(AutoVacPID == 0);
+ Assert(SlotSyncWorkerPID == 0);
/* syslogger is not considered here */
pmState = PM_NO_CHILDREN;
}
signal_child(AutoVacPID, signal);
if (PgArchPID != 0)
signal_child(PgArchPID, signal);
+ if (SlotSyncWorkerPID != 0)
+ signal_child(SlotSyncWorkerPID, signal);
}
/*
*/
if (strcmp(argv[1], "--forkbackend") == 0 ||
strcmp(argv[1], "--forkavlauncher") == 0 ||
+ strcmp(argv[1], "--forkssworker") == 0 ||
strcmp(argv[1], "--forkavworker") == 0 ||
strcmp(argv[1], "--forkaux") == 0 ||
strcmp(argv[1], "--forkbgworker") == 0)
AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */
}
+ if (strcmp(argv[1], "--forkssworker") == 0)
+ {
+ /* Restore basic shared memory pointers */
+ InitShmemAccess(UsedShmemSegAddr);
+
+ ReplSlotSyncWorkerMain(argc - 2, argv + 2); /* does not return */
+ }
if (strcmp(argv[1], "--forkbgworker") == 0)
{
/* do this as early as possible; in particular, before InitProcess() */
}
+/*
+ * MaybeStartSlotSyncWorker
+ * Start the slot sync worker, if not running and our state allows.
+ *
+ * We allow to start the slot sync worker when we are on a hot standby,
+ * fast or immediate shutdown is not in progress, slot sync parameters
+ * are configured correctly, and it is the first time of worker's launch,
+ * or enough time has passed since the worker was launched last.
+ */
+static void
+MaybeStartSlotSyncWorker(void)
+{
+ if (SlotSyncWorkerPID == 0 && pmState == PM_HOT_STANDBY &&
+ Shutdown <= SmartShutdown && sync_replication_slots &&
+ ValidateSlotSyncParams(LOG) && SlotSyncWorkerCanRestart())
+ SlotSyncWorkerPID = StartSlotSyncWorker();
+}
+
/*
* Create the opts file
*/
*
* This file contains the code for slot synchronization on a physical standby
* to fetch logical failover slots information from the primary server, create
- * the slots on the standby and synchronize them. This is done by a call to SQL
- * function pg_sync_replication_slots.
+ * the slots on the standby and synchronize them periodically.
*
- * If on physical standby, the WAL corresponding to the remote's restart_lsn
- * is not available or the remote's catalog_xmin precedes the oldest xid for which
- * it is guaranteed that rows wouldn't have been removed then we cannot create
- * the local standby slot because that would mean moving the local slot
+ * Slot synchronization can be performed either automatically by enabling slot
+ * sync worker or manually by calling SQL function pg_sync_replication_slots().
+ *
+ * If the WAL corresponding to the remote's restart_lsn is not available on the
+ * physical standby or the remote's catalog_xmin precedes the oldest xid for
+ * which it is guaranteed that rows wouldn't have been removed then we cannot
+ * create the local standby slot because that would mean moving the local slot
* backward and decoding won't be possible via such a slot. In this case, the
* slot will be marked as RS_TEMPORARY. Once the primary server catches up,
* the slot will be marked as RS_PERSISTENT (which means sync-ready) after
- * which we can call pg_sync_replication_slots() periodically to perform
- * syncs.
+ * which slot sync worker can perform the sync periodically or user can call
+ * pg_sync_replication_slots() periodically to perform the syncs.
+ *
+ * The slot sync worker waits for some time before the next synchronization,
+ * with the duration varying based on whether any slots were updated during
+ * the last cycle. Refer to the comments above wait_for_slot_activity() for
+ * more details.
*
* Any standby synchronized slots will be dropped if they no longer need
* to be synchronized. See comment atop drop_local_obsolete_slots() for more
#include "postgres.h"
+#include <time.h>
+
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
+#include "libpq/pqsignal.h"
+#include "pgstat.h"
+#include "postmaster/fork_process.h"
+#include "postmaster/interrupt.h"
+#include "postmaster/postmaster.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/proc.h"
#include "storage/procarray.h"
+#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
+#include "utils/ps_status.h"
+#include "utils/timeout.h"
-/* Struct for sharing information to control slot synchronization. */
+/*
+ * Struct for sharing information to control slot synchronization.
+ *
+ * The slot sync worker's pid is needed by the startup process to shut it
+ * down during promotion. The startup process shuts down the slot sync worker
+ * and also sets stopSignaled=true to handle the race condition when the
+ * postmaster has not noticed the promotion yet and thus may end up restarting
+ * the slot sync worker. If stopSignaled is set, the worker will exit in such a
+ * case. Note that we don't need to reset this variable as after promotion the
+ * slot sync worker won't be restarted because the pmState changes to PM_RUN from
+ * PM_HOT_STANDBY and we don't support demoting primary without restarting the
+ * server. See MaybeStartSlotSyncWorker.
+ *
+ * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
+ * overwrites.
+ *
+ * The 'last_start_time' is needed by postmaster to start the slot sync worker
+ * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where a immediate restart
+ * is expected (e.g., slot sync GUCs change), slot sync worker will reset
+ * last_start_time before exiting, so that postmaster can start the worker
+ * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
+ *
+ * All the fields except 'syncing' are used only by slotsync worker.
+ * 'syncing' is used both by worker and SQL function pg_sync_replication_slots.
+ */
typedef struct SlotSyncCtxStruct
{
- /* prevents concurrent slot syncs to avoid slot overwrites */
+ pid_t pid;
+ bool stopSignaled;
bool syncing;
+ time_t last_start_time;
slock_t mutex;
} SlotSyncCtxStruct;
SlotSyncCtxStruct *SlotSyncCtx = NULL;
+/* GUC variable */
+bool sync_replication_slots = false;
+
+/*
+ * The sleep time (ms) between slot-sync cycles varies dynamically
+ * (within a MIN/MAX range) according to slot activity. See
+ * wait_for_slot_activity() for details.
+ */
+#define MIN_WORKER_NAPTIME_MS 200
+#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */
+
+static long sleep_ms = MIN_WORKER_NAPTIME_MS;
+
+/* The restart interval for slot sync work used by postmaster */
+#define SLOTSYNC_RESTART_INTERVAL_SEC 10
+
+/* Flag to tell if we are in a slot sync worker process */
+static bool am_slotsync_worker = false;
+
/*
* Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
* in SlotSyncCtxStruct, this flag is true only if the current process is
ReplicationSlotInvalidationCause invalidated;
} RemoteSlot;
+#ifdef EXEC_BACKEND
+static pid_t slotsyncworker_forkexec(void);
+#endif
+NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn();
+
+static void slotsync_failure_callback(int code, Datum arg);
+
/*
* If necessary, update the local synced slot's metadata based on the data
* from the remote slot.
* If the remote restart_lsn and catalog_xmin have caught up with the
* local ones, then update the LSNs and persist the local synced slot for
* future synchronization; otherwise, do nothing.
+ *
+ * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
+ * false.
*/
-static void
+static bool
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
slot->data.catalog_xmin));
- return;
+ return false;
}
/* First time slot update, the function must return true */
ereport(LOG,
errmsg("newly created slot \"%s\" is sync-ready now",
remote_slot->name));
+
+ return true;
}
/*
* the remote_slot catches up with locally reserved position and local slot is
* updated. The slot is then persisted and is considered as sync-ready for
* periodic syncs.
+ *
+ * Returns TRUE if the local slot is updated.
*/
-static void
+static bool
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
+ bool slot_updated = false;
/*
* Make sure that concerned WAL is received and flushed before syncing
*/
latestFlushPtr = GetStandbyFlushRecPtr(NULL);
if (remote_slot->confirmed_lsn > latestFlushPtr)
- elog(ERROR,
- "skipping slot synchronization as the received slot sync"
- " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
- LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
- remote_slot->name,
- LSN_FORMAT_ARGS(latestFlushPtr));
+ {
+ ereport(am_slotsync_worker ? LOG : ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("skipping slot synchronization as the received slot sync"
+ " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+ LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+ remote_slot->name,
+ LSN_FORMAT_ARGS(latestFlushPtr)));
+
+ return false;
+ }
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
/* Make sure the invalidated state persists across server restart */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
+
+ slot_updated = true;
}
/* Skip the sync of an invalidated slot */
if (slot->data.invalidated != RS_INVAL_NONE)
{
ReplicationSlotRelease();
- return;
+ return slot_updated;
}
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
- update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+ slot_updated = update_and_persist_local_synced_slot(remote_slot,
+ remote_dbid);
}
/* Slot ready for sync, so sync it. */
{
ReplicationSlotMarkDirty();
ReplicationSlotSave();
+
+ slot_updated = true;
}
}
}
/* Skip creating the local slot if remote_slot is invalidated already */
if (remote_slot->invalidated != RS_INVAL_NONE)
- return;
+ return false;
/*
* We create temporary slots instead of ephemeral slots here because
LWLockRelease(ProcArrayLock);
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+
+ slot_updated = true;
}
ReplicationSlotRelease();
+
+ return slot_updated;
}
/*
*
* Gets the failover logical slots info from the primary server and updates
* the slots locally. Creates the slots if not present on the standby.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
*/
-static void
+static bool
synchronize_slots(WalReceiverConn *wrconn)
{
#define SLOTSYNC_COLUMN_COUNT 9
WalRcvExecResult *res;
TupleTableSlot *tupslot;
List *remote_slot_list = NIL;
+ bool some_slot_updated = false;
+ bool started_tx = false;
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
" restart_lsn, catalog_xmin, two_phase, failover,"
" database, conflict_reason"
syncing_slots = true;
+ /* The syscache access in walrcv_exec() needs a transaction env. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
/* Execute the query */
res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
-
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch failover logical slots info from the primary server: %s",
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
- synchronize_one_slot(remote_slot, remote_dbid);
+ some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
walrcv_clear_result(res);
+ if (started_tx)
+ CommitTransactionCommand();
+
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
+
+ return some_slot_updated;
}
/*
TupleTableSlot *tupslot;
bool remote_in_recovery;
bool primary_slot_valid;
+ bool started_tx = false;
initStringInfo(&cmd);
appendStringInfo(&cmd,
" WHERE slot_type='physical' AND slot_name=%s",
quote_literal_cstr(PrimarySlotName));
+ /* The syscache access in walrcv_exec() needs a transaction env. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
pfree(cmd.data);
ExecClearTuple(tupslot);
walrcv_clear_result(res);
+
+ if (started_tx)
+ CommitTransactionCommand();
}
/*
- * Check all necessary GUCs for slot synchronization are set
- * appropriately, otherwise, raise ERROR.
+ * Checks if dbname is specified in 'primary_conninfo'.
+ *
+ * Error out if not specified otherwise return it.
*/
-void
-ValidateSlotSyncParams(void)
+char *
+CheckAndGetDbnameFromConninfo(void)
{
char *dbname;
+ /*
+ * The slot synchronization needs a database connection for walrcv_exec to
+ * work.
+ */
+ dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
+ if (dbname == NULL)
+ ereport(ERROR,
+
+ /*
+ * translator: dbname is a specific option; %s is a GUC variable name
+ */
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("slot synchronization requires dbname to be specified in %s",
+ "primary_conninfo"));
+ return dbname;
+}
+
+/*
+ * Return true if all necessary GUCs for slot synchronization are set
+ * appropriately, otherwise, return false.
+ */
+bool
+ValidateSlotSyncParams(int elevel)
+{
+ /*
+ * Logical slot sync/creation requires wal_level >= logical.
+ *
+ * Sincle altering the wal_level requires a server restart, so error out
+ * in this case regardless of elevel provided by caller.
+ */
+ if (wal_level < WAL_LEVEL_LOGICAL)
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("slot synchronization requires wal_level >= \"logical\""));
+ return false;
+ }
+
/*
* A physical replication slot(primary_slot_name) is required on the
* primary to ensure that the rows needed by the standby are not removed
* be invalidated.
*/
if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
+ return false;
+ }
/*
* hot_standby_feedback must be enabled to cooperate with the physical
* catalog_xmin values on the standby.
*/
if (!hot_standby_feedback)
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be enabled",
"hot_standby_feedback"));
-
- /* Logical slot sync/creation requires wal_level >= logical. */
- if (wal_level < WAL_LEVEL_LOGICAL)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("slot synchronization requires wal_level >= \"logical\""));
+ return false;
+ }
/*
* The primary_conninfo is required to make connection to primary for
* getting slots information.
*/
if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
- ereport(ERROR,
+ {
+ ereport(elevel,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("slot synchronization requires %s to be defined",
"primary_conninfo"));
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Re-read the config file.
+ *
+ * Exit if any of the slot sync GUCs have changed. The postmaster will
+ * restart it.
+ */
+static void
+slotsync_reread_config(void)
+{
+ char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
+ char *old_primary_slotname = pstrdup(PrimarySlotName);
+ bool old_sync_replication_slots = sync_replication_slots;
+ bool old_hot_standby_feedback = hot_standby_feedback;
+ bool conninfo_changed;
+ bool primary_slotname_changed;
+
+ Assert(sync_replication_slots);
+
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+
+ conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
+ primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
+ pfree(old_primary_conninfo);
+ pfree(old_primary_slotname);
+
+ if (old_sync_replication_slots != sync_replication_slots)
+ {
+ ereport(LOG,
+ /* translator: %s is a GUC variable name */
+ errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
+ proc_exit(0);
+ }
+
+ if (conninfo_changed ||
+ primary_slotname_changed ||
+ (old_hot_standby_feedback != hot_standby_feedback))
+ {
+ ereport(LOG,
+ errmsg("slot sync worker will restart because of a parameter change"));
+
+ /*
+ * Reset the last-start time for this worker so that the postmaster
+ * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
+ */
+ SlotSyncCtx->last_start_time = 0;
+
+ proc_exit(0);
+ }
+
+}
+
+/*
+ * Interrupt handler for main loop of slot sync worker.
+ */
+static void
+ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
+{
+ CHECK_FOR_INTERRUPTS();
+
+ if (ShutdownRequestPending)
+ {
+ ereport(LOG,
+ errmsg("slot sync worker is shutting down on receiving SIGINT"));
+
+ proc_exit(0);
+ }
+
+ if (ConfigReloadPending)
+ slotsync_reread_config();
+}
+
+/*
+ * Cleanup function for slotsync worker.
+ *
+ * Called on slotsync worker exit.
+ */
+static void
+slotsync_worker_onexit(int code, Datum arg)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ SlotSyncCtx->pid = InvalidPid;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+}
+
+/*
+ * Sleep for long enough that we believe it's likely that the slots on primary
+ * get updated.
+ *
+ * If there is no slot activity the wait time between sync-cycles will double
+ * (to a maximum of 30s). If there is some slot activity the wait time between
+ * sync-cycles is reset to the minimum (200ms).
+ */
+static void
+wait_for_slot_activity(bool some_slot_updated)
+{
+ int rc;
+
+ if (!some_slot_updated)
+ {
+ /*
+ * No slots were updated, so double the sleep time, but not beyond the
+ * maximum allowable value.
+ */
+ sleep_ms = Min(sleep_ms * 2, MAX_WORKER_NAPTIME_MS);
+ }
+ else
+ {
+ /*
+ * Some slots were updated since the last sleep, so reset the sleep
+ * time.
+ */
+ sleep_ms = MIN_WORKER_NAPTIME_MS;
+ }
+
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ sleep_ms,
+ WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+}
+
+/*
+ * The main loop of our worker process.
+ *
+ * It connects to the primary server, fetches logical failover slots
+ * information periodically in order to create and sync the slots.
+ */
+NON_EXEC_STATIC void
+ReplSlotSyncWorkerMain(int argc, char *argv[])
+{
+ WalReceiverConn *wrconn = NULL;
+ char *dbname;
+ char *err;
+ sigjmp_buf local_sigjmp_buf;
+ StringInfoData app_name;
+
+ am_slotsync_worker = true;
+
+ MyBackendType = B_SLOTSYNC_WORKER;
+
+ init_ps_display(NULL);
+
+ SetProcessingMode(InitProcessing);
/*
- * The slot synchronization needs a database connection for walrcv_exec to
- * work.
+ * Create a per-backend PGPROC struct in shared memory. We must do this
+ * before we access any shared memory.
*/
- dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
- if (dbname == NULL)
- ereport(ERROR,
+ InitProcess();
+
+ /*
+ * Early initialization.
+ */
+ BaseInit();
+
+ Assert(SlotSyncCtx != NULL);
+
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+ Assert(SlotSyncCtx->pid == InvalidPid);
+
+ /*
+ * Startup process signaled the slot sync worker to stop, so if meanwhile
+ * postmaster ended up starting the worker again, exit.
+ */
+ if (SlotSyncCtx->stopSignaled)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ proc_exit(0);
+ }
+
+ /* Advertise our PID so that the startup process can kill us on promotion */
+ SlotSyncCtx->pid = MyProcPid;
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ ereport(LOG, errmsg("slot sync worker started"));
+
+ /* Register it as soon as SlotSyncCtx->pid is initialized. */
+ before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
+
+ /* Setup signal handling */
+ pqsignal(SIGHUP, SignalHandlerForConfigReload);
+ pqsignal(SIGINT, SignalHandlerForShutdownRequest);
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /*
+ * Establishes SIGALRM handler and initialize timeout module. It is needed
+ * by InitPostgres to register different timeouts.
+ */
+ InitializeTimeouts();
+
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * We just need to clean up, report the error, and go away.
+ *
+ * If we do not have this handling here, then since this worker process
+ * operates at the bottom of the exception stack, ERRORs turn into FATALs.
+ * Therefore, we create our own exception handler to catch ERRORs.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* since not using PG_TRY, must reset error stack by hand */
+ error_context_stack = NULL;
+
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
/*
- * translator: dbname is a specific option; %s is a GUC variable name
+ * We can now go away. Note that because we called InitProcess, a
+ * callback was registered to do ProcKill, which will clean up
+ * necessary state.
*/
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("slot synchronization requires dbname to be specified in %s",
- "primary_conninfo"));
+ proc_exit(0);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ /*
+ * Unblock signals (they were blocked when the postmaster forked us)
+ */
+ sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+ dbname = CheckAndGetDbnameFromConninfo();
+
+ /*
+ * Connect to the database specified by the user in primary_conninfo. We
+ * need a database connection for walrcv_exec to work which we use to
+ * fetch slot information from the remote node. See comments atop
+ * libpqrcv_exec.
+ *
+ * We do not specify a specific user here since the slot sync worker will
+ * operate as a superuser. This is safe because the slot sync worker does
+ * not interact with user tables, eliminating the risk of executing
+ * arbitrary code within triggers.
+ */
+ InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
+
+ SetProcessingMode(NormalProcessing);
+
+ initStringInfo(&app_name);
+ if (cluster_name[0])
+ appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
+ else
+ appendStringInfo(&app_name, "%s", "slotsync worker");
+
+ /*
+ * Establish the connection to the primary server for slot
+ * synchronization.
+ */
+ wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
+ app_name.data, &err);
+ pfree(app_name.data);
+
+ if (!wrconn)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not connect to the primary server: %s", err));
+
+ /*
+ * Register the failure callback once we have the connection.
+ *
+ * XXX: This can be combined with previous such cleanup registration of
+ * slotsync_worker_onexit() but that will need the connection to be made
+ * global and we want to avoid introducing global for this purpose.
+ */
+ before_shmem_exit(slotsync_failure_callback, PointerGetDatum(wrconn));
+
+ /*
+ * Using the specified primary server connection, check that we are not a
+ * cascading standby and slot configured in 'primary_slot_name' exists on
+ * the primary server.
+ */
+ validate_remote_info(wrconn);
+
+ /* Main loop to synchronize slots */
+ for (;;)
+ {
+ bool some_slot_updated = false;
+
+ ProcessSlotSyncInterrupts(wrconn);
+
+ some_slot_updated = synchronize_slots(wrconn);
+
+ wait_for_slot_activity(some_slot_updated);
+ }
+
+ /*
+ * The slot sync worker can't get here because it will only stop when it
+ * receives a SIGINT from the startup process, or when there is an error.
+ */
+ Assert(false);
+}
+
+/*
+ * Main entry point for slot sync worker process, to be called from the
+ * postmaster.
+ */
+int
+StartSlotSyncWorker(void)
+{
+ pid_t pid;
+
+#ifdef EXEC_BACKEND
+ switch ((pid = slotsyncworker_forkexec()))
+ {
+#else
+ switch ((pid = fork_process()))
+ {
+ case 0:
+ /* in postmaster child ... */
+ InitPostmasterChild();
+
+ /* Close the postmaster's sockets */
+ ClosePostmasterPorts(false);
+
+ ReplSlotSyncWorkerMain(0, NULL);
+ break;
+#endif
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork slot sync worker process: %m")));
+ return 0;
+
+ default:
+ return (int) pid;
+ }
+
+ /* shouldn't get here */
+ return 0;
+}
+
+#ifdef EXEC_BACKEND
+/*
+ * The forkexec routine for the slot sync worker process.
+ *
+ * Format up the arglist, then fork and exec.
+ */
+static pid_t
+slotsyncworker_forkexec(void)
+{
+ char *av[10];
+ int ac = 0;
+
+ av[ac++] = "postgres";
+ av[ac++] = "--forkssworker";
+ av[ac++] = NULL; /* filled in by postmaster_forkexec */
+ av[ac] = NULL;
+
+ Assert(ac < lengthof(av));
+
+ return postmaster_forkexec(ac, av);
+}
+#endif
+
+/*
+ * Shut down the slot sync worker.
+ */
+void
+ShutDownSlotSync(void)
+{
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ SlotSyncCtx->stopSignaled = true;
+
+ if (SlotSyncCtx->pid == InvalidPid)
+ {
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ return;
+ }
+ SpinLockRelease(&SlotSyncCtx->mutex);
+
+ kill(SlotSyncCtx->pid, SIGINT);
+
+ /* Wait for it to die */
+ for (;;)
+ {
+ int rc;
+
+ /* Wait a bit, we don't expect to have to wait long */
+ rc = WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
+
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(MyLatch);
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ SpinLockAcquire(&SlotSyncCtx->mutex);
+
+ /* Is it gone? */
+ if (SlotSyncCtx->pid == InvalidPid)
+ break;
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+ }
+
+ SpinLockRelease(&SlotSyncCtx->mutex);
+}
+
+/*
+ * SlotSyncWorkerCanRestart
+ *
+ * Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed
+ * since it was launched last. Otherwise returns false.
+ *
+ * This is a safety valve to protect against continuous respawn attempts if the
+ * worker is dying immediately at launch. Note that since we will retry to
+ * launch the worker from the postmaster main loop, we will get another
+ * chance later.
+ */
+bool
+SlotSyncWorkerCanRestart(void)
+{
+ time_t curtime = time(NULL);
+
+ /* Return false if too soon since last start. */
+ if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
+ (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
+ return false;
+
+ SlotSyncCtx->last_start_time = curtime;
+
+ return true;
}
/*
- * Is current process syncing replication slots ?
+ * Is current process syncing replication slots?
+ *
+ * Could be either backend executing SQL function or slot sync worker.
*/
bool
IsSyncingReplicationSlots(void)
return syncing_slots;
}
+/*
+ * Is current process a slot sync worker?
+ */
+bool
+IsLogicalSlotSyncWorker(void)
+{
+ return am_slotsync_worker;
+}
+
/*
* Amount of shared memory required for slot synchronization.
*/
void
SlotSyncShmemInit(void)
{
+ Size size = SlotSyncShmemSize();
bool found;
SlotSyncCtx = (SlotSyncCtxStruct *)
- ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found);
+ ShmemInitStruct("Slot Sync Data", size, &found);
if (!found)
{
- SlotSyncCtx->syncing = false;
+ memset(SlotSyncCtx, 0, size);
+ SlotSyncCtx->pid = InvalidPid;
SpinLockInit(&SlotSyncCtx->mutex);
}
}