summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/twophase.c32
-rw-r--r--src/backend/access/transam/xact.c18
-rw-r--r--src/backend/access/transam/xlog.c2
-rw-r--r--src/backend/access/transam/xlogrecovery.c2
-rw-r--r--src/backend/catalog/pg_subscription.c1
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/commands/subscriptioncmds.c400
-rw-r--r--src/backend/replication/logical/applyparallelworker.c3
-rw-r--r--src/backend/replication/logical/launcher.c228
-rw-r--r--src/backend/replication/logical/reorderbuffer.c2
-rw-r--r--src/backend/replication/logical/tablesync.c3
-rw-r--r--src/backend/replication/logical/worker.c623
-rw-r--r--src/backend/replication/slot.c48
-rw-r--r--src/backend/replication/walsender.c60
-rw-r--r--src/backend/storage/ipc/procarray.c20
-rw-r--r--src/backend/utils/adt/pg_upgrade_support.c19
16 files changed, 1389 insertions, 75 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 85cbe397cb2..7918176fc58 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1183,7 +1183,11 @@ EndPrepare(GlobalTransaction gxact)
* starting immediately after the WAL record is inserted could complete
* without fsync'ing our state file. (This is essentially the same kind
* of race condition as the COMMIT-to-clog-write case that
- * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
+ * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
+ * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
+ * the critical commit section. We need to know about such transactions
+ * for conflict detection in logical replication. See
+ * GetOldestActiveTransactionId(true, false) and its use.
*
* We save the PREPARE record's location in the gxact for later use by
* CheckPointTwoPhase.
@@ -2298,7 +2302,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
* RecordTransactionCommitPrepared
*
* This is basically the same as RecordTransactionCommit (q.v. if you change
- * this function): in particular, we must set DELAY_CHKPT_START to avoid a
+ * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
* race condition.
*
* We know the transaction made at least one XLOG entry (its PREPARE),
@@ -2318,7 +2322,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
const char *gid)
{
XLogRecPtr recptr;
- TimestampTz committs = GetCurrentTimestamp();
+ TimestampTz committs;
bool replorigin;
/*
@@ -2331,8 +2335,24 @@ RecordTransactionCommitPrepared(TransactionId xid,
START_CRIT_SECTION();
/* See notes in RecordTransactionCommit */
- Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
- MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
+ MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+ /*
+ * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
+ * commit time is written.
+ */
+ pg_write_barrier();
+
+ /*
+ * Note it is important to set committs value after marking ourselves as
+ * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
+ * we want to ensure all transactions that have acquired commit timestamp
+ * are finished before we allow the logical replication client to advance
+ * its xid which is used to hold back dead rows for conflict detection.
+ * See comments atop worker.c.
+ */
+ committs = GetCurrentTimestamp();
/*
* Emit the XLOG commit record. Note that we mark 2PC commits as
@@ -2381,7 +2401,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
TransactionIdCommitTree(xid, nchildren, children);
/* Checkpoint can proceed now */
- MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+ MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
END_CRIT_SECTION();
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 41601fcb280..b46e7e9c2a6 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1431,10 +1431,22 @@ RecordTransactionCommit(void)
* without holding the ProcArrayLock, since we're the only one
* modifying it. This makes checkpoint's determination of which xacts
* are delaying the checkpoint a bit fuzzy, but it doesn't matter.
+ *
+ * Note, it is important to get the commit timestamp after marking the
+ * transaction in the commit critical section. See
+ * RecordTransactionCommitPrepared.
*/
- Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
+ Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
START_CRIT_SECTION();
- MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+ MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+ Assert(xactStopTimestamp == 0);
+
+ /*
+ * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible
+ * before commit time is written.
+ */
+ pg_write_barrier();
/*
* Insert the commit XLOG record.
@@ -1537,7 +1549,7 @@ RecordTransactionCommit(void)
*/
if (markXidCommitted)
{
- MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+ MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
END_CRIT_SECTION();
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8e7827c6ed9..eefffc4277a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7121,7 +7121,7 @@ CreateCheckPoint(int flags)
* starting snapshot of locks and transactions.
*/
if (!shutdown && XLogStandbyInfoActive())
- checkPoint.oldestActiveXid = GetOldestActiveTransactionId();
+ checkPoint.oldestActiveXid = GetOldestActiveTransactionId(false, true);
else
checkPoint.oldestActiveXid = InvalidTransactionId;
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 23878b2dd91..e8f3ba00caa 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -4760,7 +4760,7 @@ bool
check_primary_slot_name(char **newval, void **extra, GucSource source)
{
if (*newval && strcmp(*newval, "") != 0 &&
- !ReplicationSlotValidateName(*newval, WARNING))
+ !ReplicationSlotValidateName(*newval, false, WARNING))
return false;
return true;
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 1395032413e..63c2992d19f 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -103,6 +103,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->passwordrequired = subform->subpasswordrequired;
sub->runasowner = subform->subrunasowner;
sub->failover = subform->subfailover;
+ sub->retaindeadtuples = subform->subretaindeadtuples;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b2d5332effc..f6eca09ee15 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1386,7 +1386,8 @@ REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr,
subpasswordrequired, subrunasowner, subfailover,
- subslotname, subsynccommit, subpublications, suborigin)
+ subretaindeadtuples, subslotname, subsynccommit,
+ subpublications, suborigin)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index e23b0de7242..cd6c3684482 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -14,6 +14,7 @@
#include "postgres.h"
+#include "access/commit_ts.h"
#include "access/htup_details.h"
#include "access/table.h"
#include "access/twophase.h"
@@ -71,8 +72,9 @@
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
#define SUBOPT_RUN_AS_OWNER 0x00001000
#define SUBOPT_FAILOVER 0x00002000
-#define SUBOPT_LSN 0x00004000
-#define SUBOPT_ORIGIN 0x00008000
+#define SUBOPT_RETAIN_DEAD_TUPLES 0x00004000
+#define SUBOPT_LSN 0x00008000
+#define SUBOPT_ORIGIN 0x00010000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -98,6 +100,7 @@ typedef struct SubOpts
bool passwordrequired;
bool runasowner;
bool failover;
+ bool retaindeadtuples;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -105,8 +108,10 @@ typedef struct SubOpts
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
static void check_publications_origin(WalReceiverConn *wrconn,
List *publications, bool copydata,
- char *origin, Oid *subrel_local_oids,
- int subrel_count, char *subname);
+ bool retain_dead_tuples, char *origin,
+ Oid *subrel_local_oids, int subrel_count,
+ char *subname);
+static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->runasowner = false;
if (IsSet(supported_opts, SUBOPT_FAILOVER))
opts->failover = false;
+ if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ opts->retaindeadtuples = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
@@ -210,7 +217,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (strcmp(opts->slot_name, "none") == 0)
opts->slot_name = NULL;
else
- ReplicationSlotValidateName(opts->slot_name, ERROR);
+ ReplicationSlotValidateName(opts->slot_name, false, ERROR);
}
else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
strcmp(defel->defname, "copy_data") == 0)
@@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_FAILOVER;
opts->failover = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
+ strcmp(defel->defname, "retain_dead_tuples") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
+ opts->retaindeadtuples = defGetBoolean(defel);
+ }
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
@@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+ SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -630,6 +647,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
stmt->subname)));
}
+ /* Ensure that we can enable retain_dead_tuples */
+ if (opts.retaindeadtuples)
+ CheckSubDeadTupleRetention(true, !opts.enabled, WARNING);
+
if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
opts.slot_name == NULL)
opts.slot_name = stmt->subname;
@@ -670,6 +691,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+ values[Anum_pg_subscription_subretaindeadtuples - 1] =
+ BoolGetDatum(opts.retaindeadtuples);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -722,7 +745,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
{
check_publications(wrconn, publications);
check_publications_origin(wrconn, publications, opts.copy_data,
- opts.origin, NULL, 0, stmt->subname);
+ opts.retaindeadtuples, opts.origin,
+ NULL, 0, stmt->subname);
+
+ if (opts.retaindeadtuples)
+ check_pub_dead_tuple_retention(wrconn);
/*
* Set sync state based on if we were asked to do data copy or
@@ -881,8 +908,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
sizeof(Oid), oid_cmp);
check_publications_origin(wrconn, sub->publications, copy_data,
- sub->origin, subrel_local_oids,
- subrel_count, sub->name);
+ sub->retaindeadtuples, sub->origin,
+ subrel_local_oids, subrel_count, sub->name);
/*
* Rels that we want to remove from subscription and drop any slots
@@ -1040,18 +1067,22 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
}
/*
- * Common checks for altering failover and two_phase options.
+ * Common checks for altering failover, two_phase, and retain_dead_tuples
+ * options.
*/
static void
CheckAlterSubOption(Subscription *sub, const char *option,
bool slot_needs_update, bool isTopLevel)
{
+ Assert(strcmp(option, "failover") == 0 ||
+ strcmp(option, "two_phase") == 0 ||
+ strcmp(option, "retain_dead_tuples") == 0);
+
/*
- * The checks in this function are required only for failover and
- * two_phase options.
+ * Altering the retain_dead_tuples option does not update the slot on the
+ * publisher.
*/
- Assert(strcmp(option, "failover") == 0 ||
- strcmp(option, "two_phase") == 0);
+ Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
/*
* Do not allow changing the option if the subscription is enabled. This
@@ -1063,6 +1094,39 @@ CheckAlterSubOption(Subscription *sub, const char *option,
* the publisher by the existing walsender, so we could have allowed that
* even when the subscription is enabled. But we kept this restriction for
* the sake of consistency and simplicity.
+ *
+ * Additionally, do not allow changing the retain_dead_tuples option when
+ * the subscription is enabled to prevent race conditions arising from the
+ * new option value being acknowledged asynchronously by the launcher and
+ * apply workers.
+ *
+ * Without the restriction, a race condition may arise when a user
+ * disables and immediately re-enables the retain_dead_tuples option. In
+ * this case, the launcher might drop the slot upon noticing the disabled
+ * action, while the apply worker may keep maintaining
+ * oldest_nonremovable_xid without noticing the option change. During this
+ * period, a transaction ID wraparound could falsely make this ID appear
+ * as if it originates from the future w.r.t the transaction ID stored in
+ * the slot maintained by launcher.
+ *
+ * Similarly, if the user enables retain_dead_tuples concurrently with the
+ * launcher starting the worker, the apply worker may start calculating
+ * oldest_nonremovable_xid before the launcher notices the enable action.
+ * Consequently, the launcher may update slot.xmin to a newer value than
+ * that maintained by the worker. In subsequent cycles, upon integrating
+ * the worker's oldest_nonremovable_xid, the launcher might detect a
+ * retreat in the calculated xmin, necessitating additional handling.
+ *
+ * XXX To address the above race conditions, we can define
+ * oldest_nonremovable_xid as FullTransactionID and adds the check to
+ * disallow retreating the conflict slot's xmin. For now, we kept the
+ * implementation simple by disallowing change to the retain_dead_tuples,
+ * but in the future we can change this after some more analysis.
+ *
+ * Note that we could restrict only the enabling of retain_dead_tuples to
+ * avoid the race conditions described above, but we maintain the
+ * restriction for both enable and disable operations for the sake of
+ * consistency.
*/
if (sub->enabled)
ereport(ERROR,
@@ -1110,6 +1174,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
bool update_tuple = false;
bool update_failover = false;
bool update_two_phase = false;
+ bool check_pub_rdt = false;
+ bool retain_dead_tuples;
+ char *origin;
Subscription *sub;
Form_pg_subscription form;
bits32 supported_opts;
@@ -1137,6 +1204,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
sub = GetSubscription(subid, false);
+ retain_dead_tuples = sub->retaindeadtuples;
+ origin = sub->origin;
+
/*
* Don't allow non-superuser modification of a subscription with
* password_required=false.
@@ -1165,7 +1235,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
- SUBOPT_ORIGIN);
+ SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
@@ -1325,11 +1395,62 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subfailover - 1] = true;
}
+ if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+ {
+ values[Anum_pg_subscription_subretaindeadtuples - 1] =
+ BoolGetDatum(opts.retaindeadtuples);
+ replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
+
+ CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
+
+ /*
+ * Workers may continue running even after the
+ * subscription has been disabled.
+ *
+ * To prevent race conditions (as described in
+ * CheckAlterSubOption()), ensure that all worker
+ * processes have already exited before proceeding.
+ */
+ if (logicalrep_workers_find(subid, true, true))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
+ errhint("Try again after some time.")));
+
+ /*
+ * Remind the user that enabling subscription will prevent
+ * the accumulation of dead tuples.
+ */
+ if (opts.retaindeadtuples)
+ CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE);
+
+ /*
+ * Notify the launcher to manage the replication slot for
+ * conflict detection. This ensures that replication slot
+ * is efficiently handled (created, updated, or dropped)
+ * in response to any configuration changes.
+ */
+ ApplyLauncherWakeupAtCommit();
+
+ check_pub_rdt = opts.retaindeadtuples;
+ retain_dead_tuples = opts.retaindeadtuples;
+ }
+
if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
{
values[Anum_pg_subscription_suborigin - 1] =
CStringGetTextDatum(opts.origin);
replaces[Anum_pg_subscription_suborigin - 1] = true;
+
+ /*
+ * Check if changes from different origins may be received
+ * from the publisher when the origin is changed to ANY
+ * and retain_dead_tuples is enabled.
+ */
+ check_pub_rdt = retain_dead_tuples &&
+ pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
+
+ origin = opts.origin;
}
update_tuple = true;
@@ -1347,6 +1468,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot enable subscription that does not have a slot name")));
+ /*
+ * Check track_commit_timestamp only when enabling the
+ * subscription in case it was disabled after creation. See
+ * comments atop CheckSubDeadTupleRetention() for details.
+ */
+ if (sub->retaindeadtuples)
+ CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
+ WARNING);
+
values[Anum_pg_subscription_subenabled - 1] =
BoolGetDatum(opts.enabled);
replaces[Anum_pg_subscription_subenabled - 1] = true;
@@ -1355,6 +1485,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
ApplyLauncherWakeupAtCommit();
update_tuple = true;
+
+ /*
+ * The subscription might be initially created with
+ * connect=false and retain_dead_tuples=true, meaning the
+ * remote server's status may not be checked. Ensure this
+ * check is conducted now.
+ */
+ check_pub_rdt = sub->retaindeadtuples && opts.enabled;
break;
}
@@ -1369,6 +1507,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
CStringGetTextDatum(stmt->conninfo);
replaces[Anum_pg_subscription_subconninfo - 1] = true;
update_tuple = true;
+
+ /*
+ * Since the remote server configuration might have changed,
+ * perform a check to ensure it permits enabling
+ * retain_dead_tuples.
+ */
+ check_pub_rdt = sub->retaindeadtuples;
break;
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
@@ -1568,14 +1713,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
}
/*
- * Try to acquire the connection necessary for altering the slot, if
- * needed.
+ * Try to acquire the connection necessary either for modifying the slot
+ * or for checking if the remote server permits enabling
+ * retain_dead_tuples.
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won't be able to rollback altered
* slot.
*/
- if (update_failover || update_two_phase)
+ if (update_failover || update_two_phase || check_pub_rdt)
{
bool must_use_password;
char *err;
@@ -1584,10 +1730,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
- /* Try to connect to the publisher. */
+ /*
+ * Try to connect to the publisher, using the new connection string if
+ * available.
+ */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
- wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
- sub->name, &err);
+ wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
+ true, true, must_use_password, sub->name,
+ &err);
if (!wrconn)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
@@ -1596,9 +1746,17 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
PG_TRY();
{
- walrcv_alter_slot(wrconn, sub->slotname,
- update_failover ? &opts.failover : NULL,
- update_two_phase ? &opts.twophase : NULL);
+ if (retain_dead_tuples)
+ check_pub_dead_tuple_retention(wrconn);
+
+ check_publications_origin(wrconn, sub->publications, false,
+ retain_dead_tuples, origin, NULL, 0,
+ sub->name);
+
+ if (update_failover || update_two_phase)
+ walrcv_alter_slot(wrconn, sub->slotname,
+ update_failover ? &opts.failover : NULL,
+ update_two_phase ? &opts.twophase : NULL);
}
PG_FINALLY();
{
@@ -2086,20 +2244,29 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
* Check and log a warning if the publisher has subscribed to the same table,
* its partition ancestors (if it's a partition), or its partition children (if
* it's a partitioned table), from some other publishers. This check is
- * required only if "copy_data = true" and "origin = none" for CREATE
- * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the
- * user that data having origin might have been copied.
+ * required in the following scenarios:
*
- * This check need not be performed on the tables that are already added
- * because incremental sync for those tables will happen through WAL and the
- * origin of the data can be identified from the WAL records.
+ * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
+ * with "copy_data = true" and "origin = none":
+ * - Warn the user that data with an origin might have been copied.
+ * - This check is skipped for tables already added, as incremental sync via
+ * WAL allows origin tracking. The list of such tables is in
+ * subrel_local_oids.
*
- * subrel_local_oids contains the list of relation oids that are already
- * present on the subscriber.
+ * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
+ * with "retain_dead_tuples = true" and "origin = any", and for ALTER
+ * SUBSCRIPTION statements that modify retain_dead_tuples or origin, or
+ * when the publisher's status changes (e.g., due to a connection string
+ * update):
+ * - Warn the user that only conflict detection info for local changes on
+ * the publisher is retained. Data from other origins may lack sufficient
+ * details for reliable conflict detection.
+ * - See comments atop worker.c for more details.
*/
static void
check_publications_origin(WalReceiverConn *wrconn, List *publications,
- bool copydata, char *origin, Oid *subrel_local_oids,
+ bool copydata, bool retain_dead_tuples,
+ char *origin, Oid *subrel_local_oids,
int subrel_count, char *subname)
{
WalRcvExecResult *res;
@@ -2108,9 +2275,29 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
Oid tableRow[1] = {TEXTOID};
List *publist = NIL;
int i;
+ bool check_rdt;
+ bool check_table_sync;
+ bool origin_none = origin &&
+ pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
+
+ /*
+ * Enable retain_dead_tuples checks only when origin is set to 'any',
+ * since with origin='none' only local changes are replicated to the
+ * subscriber.
+ */
+ check_rdt = retain_dead_tuples && !origin_none;
+
+ /*
+ * Enable table synchronization checks only when origin is 'none', to
+ * ensure that data from other origins is not inadvertently copied.
+ */
+ check_table_sync = copydata && origin_none;
- if (!copydata || !origin ||
- (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+ /* retain_dead_tuples and table sync checks occur separately */
+ Assert(!(check_rdt && check_table_sync));
+
+ /* Return if no checks are required */
+ if (!check_rdt && !check_table_sync)
return;
initStringInfo(&cmd);
@@ -2129,16 +2316,23 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
/*
* In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
* the list of relation oids that are already present on the subscriber.
- * This check should be skipped for these tables.
+ * This check should be skipped for these tables if checking for table
+ * sync scenario. However, when handling the retain_dead_tuples scenario,
+ * ensure all tables are checked, as some existing tables may now include
+ * changes from other origins due to newly created subscriptions on the
+ * publisher.
*/
- for (i = 0; i < subrel_count; i++)
+ if (check_table_sync)
{
- Oid relid = subrel_local_oids[i];
- char *schemaname = get_namespace_name(get_rel_namespace(relid));
- char *tablename = get_rel_name(relid);
+ for (i = 0; i < subrel_count; i++)
+ {
+ Oid relid = subrel_local_oids[i];
+ char *schemaname = get_namespace_name(get_rel_namespace(relid));
+ char *tablename = get_rel_name(relid);
- appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
- schemaname, tablename);
+ appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+ schemaname, tablename);
+ }
}
res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
@@ -2173,22 +2367,37 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
* XXX: For simplicity, we don't check whether the table has any data or
* not. If the table doesn't have any data then we don't need to
* distinguish between data having origin and data not having origin so we
- * can avoid logging a warning in that case.
+ * can avoid logging a warning for table sync scenario.
*/
if (publist)
{
StringInfo pubnames = makeStringInfo();
+ StringInfo err_msg = makeStringInfo();
+ StringInfo err_hint = makeStringInfo();
/* Prepare the list of publication(s) for warning message. */
GetPublicationsStr(publist, pubnames, false);
+
+ if (check_table_sync)
+ {
+ appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
+ subname);
+ appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
+ }
+ else
+ {
+ appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
+ subname);
+ appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
+ }
+
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
- subname),
- errdetail_plural("The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
- "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
+ errmsg_internal("%s", err_msg->data),
+ errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
+ "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
list_length(publist), pubnames->data),
- errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
+ errhint_internal("%s", err_hint->data));
}
ExecDropSingleTupleTableSlot(slot);
@@ -2197,6 +2406,101 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
}
/*
+ * Determine whether the retain_dead_tuples can be enabled based on the
+ * publisher's status.
+ *
+ * This option is disallowed if the publisher is running a version earlier
+ * than the PG19, or if the publisher is in recovery (i.e., it is a standby
+ * server).
+ *
+ * See comments atop worker.c for a detailed explanation.
+ */
+static void
+check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
+{
+ WalRcvExecResult *res;
+ Oid RecoveryRow[1] = {BOOLOID};
+ TupleTableSlot *slot;
+ bool isnull;
+ bool remote_in_recovery;
+
+ if (walrcv_server_version(wrconn) < 19000)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
+
+ res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not obtain recovery progress from the publisher: %s",
+ res->err)));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ elog(ERROR, "failed to fetch tuple for the recovery progress");
+
+ remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
+
+ if (remote_in_recovery)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
+/*
+ * Check if the subscriber's configuration is adequate to enable the
+ * retain_dead_tuples option.
+ *
+ * Issue an ERROR if the wal_level does not support the use of replication
+ * slots when check_guc is set to true.
+ *
+ * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
+ * set to true. This is only to highlight the importance of enabling
+ * track_commit_timestamp instead of catching all the misconfigurations, as
+ * this setting can be adjusted after subscription creation. Without it, the
+ * apply worker will simply skip conflict detection.
+ *
+ * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an
+ * ERROR since users can only modify retain_dead_tuples for disabled
+ * subscriptions. And as long as the subscription is enabled promptly, it will
+ * not pose issues.
+ */
+void
+CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
+ int elevel_for_sub_disabled)
+{
+ Assert(elevel_for_sub_disabled == NOTICE ||
+ elevel_for_sub_disabled == WARNING);
+
+ if (check_guc && wal_level < WAL_LEVEL_REPLICA)
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
+ errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
+
+ if (check_guc && !track_commit_timestamp)
+ ereport(WARNING,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
+ errhint("Consider setting \"%s\" to true.",
+ "track_commit_timestamp"));
+
+ if (sub_disabled)
+ ereport(elevel_for_sub_disabled,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
+ (elevel_for_sub_disabled > NOTICE)
+ ? errhint("Consider setting %s to false.",
+ "retain_dead_tuples") : 0);
+}
+
+/*
* Get the list of tables which belong to specified publications on the
* publisher connection.
*
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..1fa931a7422 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -441,7 +441,8 @@ pa_launch_parallel_worker(void)
MySubscription->name,
MyLogicalRepWorker->userid,
InvalidOid,
- dsm_segment_handle(winfo->dsm_seg));
+ dsm_segment_handle(winfo->dsm_seg),
+ false);
if (launched)
{
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..742d9ba68e9 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -32,6 +32,7 @@
#include "postmaster/interrupt.h"
#include "replication/logicallauncher.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
@@ -91,7 +92,6 @@ static dshash_table *last_start_times = NULL;
static bool on_commit_launcher_wakeup = false;
-static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
@@ -100,6 +100,9 @@ static int logicalrep_pa_worker_count(Oid subid);
static void logicalrep_launcher_attach_dshmem(void);
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
+static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
+static bool acquire_conflict_slot_if_exists(void);
+static void advance_conflict_slot_xmin(TransactionId new_xmin);
/*
@@ -148,6 +151,7 @@ get_subscription_list(void)
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->name = pstrdup(NameStr(subform->subname));
+ sub->retaindeadtuples = subform->subretaindeadtuples;
/* We don't fill fields we are not interested in. */
res = lappend(res, sub);
@@ -309,7 +313,8 @@ logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock)
bool
logicalrep_worker_launch(LogicalRepWorkerType wtype,
Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid, dsm_handle subworker_dsm)
+ Oid relid, dsm_handle subworker_dsm,
+ bool retain_dead_tuples)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -328,10 +333,13 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
* - must be valid worker type
* - tablesync workers are only ones to have relid
* - parallel apply worker is the only kind of subworker
+ * - The replication slot used in conflict detection is created when
+ * retain_dead_tuples is enabled
*/
Assert(wtype != WORKERTYPE_UNKNOWN);
Assert(is_tablesync_worker == OidIsValid(relid));
Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+ Assert(!retain_dead_tuples || MyReplicationSlot);
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -454,6 +462,9 @@ retry:
worker->stream_fileset = NULL;
worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
+ worker->oldest_nonremovable_xid = retain_dead_tuples
+ ? MyReplicationSlot->data.xmin
+ : InvalidTransactionId;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -1118,7 +1129,10 @@ ApplyLauncherWakeupAtCommit(void)
on_commit_launcher_wakeup = true;
}
-static void
+/*
+ * Wakeup the launcher immediately.
+ */
+void
ApplyLauncherWakeup(void)
{
if (LogicalRepCtx->launcher_pid != 0)
@@ -1150,6 +1164,12 @@ ApplyLauncherMain(Datum main_arg)
*/
BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+ /*
+ * Acquire the conflict detection slot at startup to ensure it can be
+ * dropped if no longer needed after a restart.
+ */
+ acquire_conflict_slot_if_exists();
+
/* Enter main loop */
for (;;)
{
@@ -1159,6 +1179,9 @@ ApplyLauncherMain(Datum main_arg)
MemoryContext subctx;
MemoryContext oldctx;
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+ bool can_advance_xmin = true;
+ bool retain_dead_tuples = false;
+ TransactionId xmin = InvalidTransactionId;
CHECK_FOR_INTERRUPTS();
@@ -1168,7 +1191,14 @@ ApplyLauncherMain(Datum main_arg)
ALLOCSET_DEFAULT_SIZES);
oldctx = MemoryContextSwitchTo(subctx);
- /* Start any missing workers for enabled subscriptions. */
+ /*
+ * Start any missing workers for enabled subscriptions.
+ *
+ * Also, during the iteration through all subscriptions, we compute
+ * the minimum XID required to protect deleted tuples for conflict
+ * detection if one of the subscription enables retain_dead_tuples
+ * option.
+ */
sublist = get_subscription_list();
foreach(lc, sublist)
{
@@ -1178,6 +1208,38 @@ ApplyLauncherMain(Datum main_arg)
TimestampTz now;
long elapsed;
+ if (sub->retaindeadtuples)
+ {
+ retain_dead_tuples = true;
+
+ /*
+ * Can't advance xmin of the slot unless all the subscriptions
+ * with retain_dead_tuples are enabled. This is required to
+ * ensure that we don't advance the xmin of
+ * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
+ * enabled. Otherwise, we won't be able to detect conflicts
+ * reliably for such a subscription even though it has set the
+ * retain_dead_tuples option.
+ */
+ can_advance_xmin &= sub->enabled;
+
+ /*
+ * Create a replication slot to retain information necessary
+ * for conflict detection such as dead tuples, commit
+ * timestamps, and origins.
+ *
+ * The slot is created before starting the apply worker to
+ * prevent it from unnecessarily maintaining its
+ * oldest_nonremovable_xid.
+ *
+ * The slot is created even for a disabled subscription to
+ * ensure that conflict-related information is available when
+ * applying remote changes that occurred before the
+ * subscription was enabled.
+ */
+ CreateConflictDetectionSlot();
+ }
+
if (!sub->enabled)
continue;
@@ -1186,7 +1248,27 @@ ApplyLauncherMain(Datum main_arg)
LWLockRelease(LogicalRepWorkerLock);
if (w != NULL)
- continue; /* worker is running already */
+ {
+ /*
+ * Compute the minimum xmin required to protect dead tuples
+ * required for conflict detection among all running apply
+ * workers that enables retain_dead_tuples.
+ */
+ if (sub->retaindeadtuples && can_advance_xmin)
+ compute_min_nonremovable_xid(w, &xmin);
+
+ /* worker is running already */
+ continue;
+ }
+
+ /*
+ * Can't advance xmin of the slot unless all the workers
+ * corresponding to subscriptions with retain_dead_tuples are
+ * running, disabling the further computation of the minimum
+ * nonremovable xid.
+ */
+ if (sub->retaindeadtuples)
+ can_advance_xmin = false;
/*
* If the worker is eligible to start now, launch it. Otherwise,
@@ -1210,7 +1292,8 @@ ApplyLauncherMain(Datum main_arg)
if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
- DSM_HANDLE_INVALID))
+ DSM_HANDLE_INVALID,
+ sub->retaindeadtuples))
{
/*
* We get here either if we failed to launch a worker
@@ -1230,6 +1313,20 @@ ApplyLauncherMain(Datum main_arg)
}
}
+ /*
+ * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
+ * that requires us to retain dead tuples. Otherwise, if required,
+ * advance the slot's xmin to protect dead tuples required for the
+ * conflict detection.
+ */
+ if (MyReplicationSlot)
+ {
+ if (!retain_dead_tuples)
+ ReplicationSlotDropAcquired();
+ else if (can_advance_xmin)
+ advance_conflict_slot_xmin(xmin);
+ }
+
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */
@@ -1258,6 +1355,125 @@ ApplyLauncherMain(Datum main_arg)
}
/*
+ * Determine the minimum non-removable transaction ID across all apply workers
+ * for subscriptions that have retain_dead_tuples enabled. Store the result
+ * in *xmin.
+ */
+static void
+compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
+{
+ TransactionId nonremovable_xid;
+
+ Assert(worker != NULL);
+
+ /*
+ * The replication slot for conflict detection must be created before the
+ * worker starts.
+ */
+ Assert(MyReplicationSlot);
+
+ SpinLockAcquire(&worker->relmutex);
+ nonremovable_xid = worker->oldest_nonremovable_xid;
+ SpinLockRelease(&worker->relmutex);
+
+ Assert(TransactionIdIsValid(nonremovable_xid));
+
+ if (!TransactionIdIsValid(*xmin) ||
+ TransactionIdPrecedes(nonremovable_xid, *xmin))
+ *xmin = nonremovable_xid;
+}
+
+/*
+ * Acquire the replication slot used to retain information for conflict
+ * detection, if it exists.
+ *
+ * Return true if successfully acquired, otherwise return false.
+ */
+static bool
+acquire_conflict_slot_if_exists(void)
+{
+ if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
+ return false;
+
+ ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
+ return true;
+}
+
+/*
+ * Advance the xmin the replication slot used to retain information required
+ * for conflict detection.
+ */
+static void
+advance_conflict_slot_xmin(TransactionId new_xmin)
+{
+ Assert(MyReplicationSlot);
+ Assert(TransactionIdIsValid(new_xmin));
+ Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
+
+ /* Return if the xmin value of the slot cannot be advanced */
+ if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
+ return;
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = new_xmin;
+ MyReplicationSlot->data.xmin = new_xmin;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
+
+ ReplicationSlotMarkDirty();
+ ReplicationSlotsComputeRequiredXmin(false);
+
+ /*
+ * Like PhysicalConfirmReceivedLocation(), do not save slot information
+ * each time. This is acceptable because all concurrent transactions on
+ * the publisher that require the data preceding the slot's xmin should
+ * have already been applied and flushed on the subscriber before the xmin
+ * is advanced. So, even if the slot's xmin regresses after a restart, it
+ * will be advanced again in the next cycle. Therefore, no data required
+ * for conflict detection will be prematurely removed.
+ */
+ return;
+}
+
+/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+ TransactionId xmin_horizon;
+
+ /* Exit early, if the replication slot is already created and acquired */
+ if (MyReplicationSlot)
+ return;
+
+ ereport(LOG,
+ errmsg("creating replication conflict detection slot"));
+
+ ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+ false, false);
+
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+ xmin_horizon = GetOldestSafeDecodingTransactionId(false);
+
+ SpinLockAcquire(&MyReplicationSlot->mutex);
+ MyReplicationSlot->effective_xmin = xmin_horizon;
+ MyReplicationSlot->data.xmin = xmin_horizon;
+ SpinLockRelease(&MyReplicationSlot->mutex);
+
+ ReplicationSlotsComputeRequiredXmin(true);
+
+ LWLockRelease(ProcArrayLock);
+
+ /* Write this slot to disk */
+ ReplicationSlotMarkDirty();
+ ReplicationSlotSave();
+}
+
+/*
* Is current process the logical replication launcher?
*/
bool
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7b4e8629553..5febd154b6b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -4917,7 +4917,7 @@ StartupReorderBuffer(void)
continue;
/* if it cannot be a slot, skip the directory */
- if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
+ if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
continue;
/*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e4fd6347fd1..3fea0a0206e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -615,7 +615,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->name,
MyLogicalRepWorker->userid,
rstate->relid,
- DSM_HANDLE_INVALID);
+ DSM_HANDLE_INVALID,
+ false);
}
}
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c5fb627aa56..b59221c4d06 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -132,6 +132,96 @@
* failover = true when creating the subscription. Enabling failover allows us
* to smoothly transition to the promoted standby, ensuring that we can
* subscribe to the new primary without losing any data.
+ *
+ * RETAIN DEAD TUPLES
+ * ----------------------
+ * Each apply worker that enabled retain_dead_tuples option maintains a
+ * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
+ * prevent dead rows from being removed prematurely when the apply worker still
+ * needs them to detect conflicts reliably. This helps to retain the required
+ * commit_ts module information, which further helps to detect
+ * update_origin_differs and delete_origin_differs conflicts reliably, as
+ * otherwise, vacuum freeze could remove the required information.
+ *
+ * The logical replication launcher manages an internal replication slot named
+ * "pg_conflict_detection". It asynchronously aggregates the non-removable
+ * transaction ID from all apply workers to determine the appropriate xmin for
+ * the slot, thereby retaining necessary tuples.
+ *
+ * The non-removable transaction ID in the apply worker is advanced to the
+ * oldest running transaction ID once all concurrent transactions on the
+ * publisher have been applied and flushed locally. The process involves:
+ *
+ * - RDT_GET_CANDIDATE_XID:
+ * Call GetOldestActiveTransactionId() to take oldestRunningXid as the
+ * candidate xid.
+ *
+ * - RDT_REQUEST_PUBLISHER_STATUS:
+ * Send a message to the walsender requesting the publisher status, which
+ * includes the latest WAL write position and information about transactions
+ * that are in the commit phase.
+ *
+ * - RDT_WAIT_FOR_PUBLISHER_STATUS:
+ * Wait for the status from the walsender. After receiving the first status,
+ * do not proceed if there are concurrent remote transactions that are still
+ * in the commit phase. These transactions might have been assigned an
+ * earlier commit timestamp but have not yet written the commit WAL record.
+ * Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
+ * until all these transactions have completed.
+ *
+ * - RDT_WAIT_FOR_LOCAL_FLUSH:
+ * Advance the non-removable transaction ID if the current flush location has
+ * reached or surpassed the last received WAL position.
+ *
+ * The overall state progression is: GET_CANDIDATE_XID ->
+ * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
+ * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
+ * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
+ *
+ * Retaining the dead tuples for this period is sufficient for ensuring
+ * eventual consistency using last-update-wins strategy, as dead tuples are
+ * useful for detecting conflicts only during the application of concurrent
+ * transactions from remote nodes. After applying and flushing all remote
+ * transactions that occurred concurrently with the tuple DELETE, any
+ * subsequent UPDATE from a remote node should have a later timestamp. In such
+ * cases, it is acceptable to detect an update_missing scenario and convert the
+ * UPDATE to an INSERT when applying it. But, detecting concurrent remote
+ * transactions with earlier timestamps than the DELETE is necessary, as the
+ * UPDATEs in remote transactions should be ignored if their timestamp is
+ * earlier than that of the dead tuples.
+ *
+ * Note that advancing the non-removable transaction ID is not supported if the
+ * publisher is also a physical standby. This is because the logical walsender
+ * on the standby can only get the WAL replay position but there may be more
+ * WALs that are being replicated from the primary and those WALs could have
+ * earlier commit timestamp.
+ *
+ * Similarly, when the publisher has subscribed to another publisher,
+ * information necessary for conflict detection cannot be retained for
+ * changes from origins other than the publisher. This is because publisher
+ * lacks the information on concurrent transactions of other publishers to
+ * which it subscribes. As the information on concurrent transactions is
+ * unavailable beyond subscriber's immediate publishers, the non-removable
+ * transaction ID might be advanced prematurely before changes from other
+ * origins have been fully applied.
+ *
+ * XXX Retaining information for changes from other origins might be possible
+ * by requesting the subscription on that origin to enable retain_dead_tuples
+ * and fetching the conflict detection slot.xmin along with the publisher's
+ * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
+ * wait for the remote slot's xmin to reach the oldest active transaction ID,
+ * ensuring that all transactions from other origins have been applied on the
+ * publisher, thereby getting the latest WAL position that includes all
+ * concurrent changes. However, this approach may impact performance, so it
+ * might not worth the effort.
+ *
+ * XXX It seems feasible to get the latest commit's WAL location from the
+ * publisher and wait till that is applied. However, we can't do that
+ * because commit timestamps can regress as a commit with a later LSN is not
+ * guaranteed to have a later timestamp than those with earlier LSNs. Having
+ * said that, even if that is possible, it won't improve performance much as
+ * the apply always lag and moves slowly as compared with the transactions
+ * on the publisher.
*-------------------------------------------------------------------------
*/
@@ -140,6 +230,7 @@
#include <sys/stat.h>
#include <unistd.h>
+#include "access/commit_ts.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/twophase.h"
@@ -148,6 +239,7 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "commands/subscriptioncmds.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
@@ -166,12 +258,14 @@
#include "replication/logicalrelation.h"
#include "replication/logicalworker.h"
#include "replication/origin.h"
+#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
#include "storage/buffile.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/procarray.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/dynahash.h"
@@ -268,6 +362,78 @@ typedef enum
TRANS_PARALLEL_APPLY,
} TransApplyAction;
+/*
+ * The phases involved in advancing the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details of the transition between these
+ * phases.
+ */
+typedef enum
+{
+ RDT_GET_CANDIDATE_XID,
+ RDT_REQUEST_PUBLISHER_STATUS,
+ RDT_WAIT_FOR_PUBLISHER_STATUS,
+ RDT_WAIT_FOR_LOCAL_FLUSH
+} RetainDeadTuplesPhase;
+
+/*
+ * Critical information for managing phase transitions within the
+ * RetainDeadTuplesPhase.
+ */
+typedef struct RetainDeadTuplesData
+{
+ RetainDeadTuplesPhase phase; /* current phase */
+ XLogRecPtr remote_lsn; /* WAL write position on the publisher */
+
+ /*
+ * Oldest transaction ID that was in the commit phase on the publisher.
+ * Use FullTransactionId to prevent issues with transaction ID wraparound,
+ * where a new remote_oldestxid could falsely appear to originate from the
+ * past and block advancement.
+ */
+ FullTransactionId remote_oldestxid;
+
+ /*
+ * Next transaction ID to be assigned on the publisher. Use
+ * FullTransactionId for consistency and to allow straightforward
+ * comparisons with remote_oldestxid.
+ */
+ FullTransactionId remote_nextxid;
+
+ TimestampTz reply_time; /* when the publisher responds with status */
+
+ /*
+ * Publisher transaction ID that must be awaited to complete before
+ * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
+ * FullTransactionId for the same reason as remote_nextxid.
+ */
+ FullTransactionId remote_wait_for;
+
+ TransactionId candidate_xid; /* candidate for the non-removable
+ * transaction ID */
+ TimestampTz flushpos_update_time; /* when the remote flush position was
+ * updated in final phase
+ * (RDT_WAIT_FOR_LOCAL_FLUSH) */
+
+ /*
+ * The following fields are used to determine the timing for the next
+ * round of transaction ID advancement.
+ */
+ TimestampTz last_recv_time; /* when the last message was received */
+ TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
+ int xid_advance_interval; /* how much time (ms) to wait before
+ * attempting to advance the
+ * non-removable transaction ID */
+} RetainDeadTuplesData;
+
+/*
+ * The minimum (100ms) and maximum (3 minutes) intervals for advancing
+ * non-removable transaction IDs. The maximum interval is a bit arbitrary but
+ * is sufficient to not cause any undue network traffic.
+ */
+#define MIN_XID_ADVANCE_INTERVAL 100
+#define MAX_XID_ADVANCE_INTERVAL 180000
+
/* errcontext tracker */
static ApplyErrorCallbackArg apply_error_callback_arg =
{
@@ -332,6 +498,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
+/*
+ * The remote WAL position that has been applied and flushed locally. We record
+ * and use this information both while sending feedback to the server and
+ * advancing oldest_nonremovable_xid.
+ */
+static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
+
typedef struct SubXactInfo
{
TransactionId xid; /* XID of the subxact */
@@ -372,6 +545,19 @@ static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
+static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
+static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
+static void request_publisher_status(RetainDeadTuplesData *rdt_data);
+static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+ bool status_received);
+static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
+ bool new_xid_found);
+
static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
static void apply_handle_insert_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
@@ -3577,6 +3763,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ RetainDeadTuplesData rdt_data = {0};
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3655,6 +3842,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
+ rdt_data.last_recv_time = last_recv_timestamp;
+
/* Ensure we are reading the data into our memory context. */
MemoryContextSwitchTo(ApplyMessageContext);
@@ -3681,6 +3870,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, send_time, false);
apply_dispatch(&s);
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
}
else if (c == 'k')
{
@@ -3696,8 +3887,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
last_received = end_lsn;
send_feedback(last_received, reply_requested, false);
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
UpdateWorkerStats(last_received, timestamp, true);
}
+ else if (c == 's') /* Primary status update */
+ {
+ rdt_data.remote_lsn = pq_getmsgint64(&s);
+ rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+ rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+ rdt_data.reply_time = pq_getmsgint64(&s);
+
+ /*
+ * This should never happen, see
+ * ProcessStandbyPSRequestMessage. But if it happens
+ * due to a bug, we don't want to proceed as it can
+ * incorrectly advance oldest_nonremovable_xid.
+ */
+ if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
+ elog(ERROR, "cannot get the latest WAL position from the publisher");
+
+ maybe_advance_nonremovable_xid(&rdt_data, true);
+
+ UpdateWorkerStats(last_received, rdt_data.reply_time, false);
+ }
/* other message types are purposefully ignored */
MemoryContextReset(ApplyMessageContext);
@@ -3710,6 +3924,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* confirm all writes so far */
send_feedback(last_received, false, false);
+ /* Reset the timestamp if no message was received */
+ rdt_data.last_recv_time = 0;
+
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
if (!in_remote_transaction && !in_streamed_transaction)
{
/*
@@ -3744,6 +3963,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
else
wait_time = NAPTIME_PER_CYCLE;
+ /*
+ * Ensure to wake up when it's possible to advance the non-removable
+ * transaction ID.
+ */
+ if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
+ rdt_data.xid_advance_interval)
+ wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -3807,6 +4034,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
+ maybe_advance_nonremovable_xid(&rdt_data, false);
+
/*
* Force reporting to ensure long idle periods don't lead to
* arbitrarily delayed stats. Stats can only be reported outside
@@ -3842,7 +4071,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
static XLogRecPtr last_writepos = InvalidXLogRecPtr;
- static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
XLogRecPtr writepos;
XLogRecPtr flushpos;
@@ -3921,6 +4149,367 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
}
/*
+ * Attempt to advance the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details.
+ */
+static void
+maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ if (!can_advance_nonremovable_xid(rdt_data))
+ return;
+
+ process_rdt_phase_transition(rdt_data, status_received);
+}
+
+/*
+ * Preliminary check to determine if advancing the non-removable transaction ID
+ * is allowed.
+ */
+static bool
+can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
+{
+ /*
+ * It is sufficient to manage non-removable transaction ID for a
+ * subscription by the main apply worker to detect conflicts reliably even
+ * for table sync or parallel apply workers.
+ */
+ if (!am_leader_apply_worker())
+ return false;
+
+ /* No need to advance if retaining dead tuples is not required */
+ if (!MySubscription->retaindeadtuples)
+ return false;
+
+ return true;
+}
+
+/*
+ * Process phase transitions during the non-removable transaction ID
+ * advancement. See comments atop worker.c for details of the transition.
+ */
+static void
+process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ switch (rdt_data->phase)
+ {
+ case RDT_GET_CANDIDATE_XID:
+ get_candidate_xid(rdt_data);
+ break;
+ case RDT_REQUEST_PUBLISHER_STATUS:
+ request_publisher_status(rdt_data);
+ break;
+ case RDT_WAIT_FOR_PUBLISHER_STATUS:
+ wait_for_publisher_status(rdt_data, status_received);
+ break;
+ case RDT_WAIT_FOR_LOCAL_FLUSH:
+ wait_for_local_flush(rdt_data);
+ break;
+ }
+}
+
+/*
+ * Workhorse for the RDT_GET_CANDIDATE_XID phase.
+ */
+static void
+get_candidate_xid(RetainDeadTuplesData *rdt_data)
+{
+ TransactionId oldest_running_xid;
+ TimestampTz now;
+
+ /*
+ * Use last_recv_time when applying changes in the loop to avoid
+ * unnecessary system time retrieval. If last_recv_time is not available,
+ * obtain the current timestamp.
+ */
+ now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+ /*
+ * Compute the candidate_xid and request the publisher status at most once
+ * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
+ * details on how this value is dynamically adjusted. This is to avoid
+ * using CPU and network resources without making much progress.
+ */
+ if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+ rdt_data->xid_advance_interval))
+ return;
+
+ /*
+ * Immediately update the timer, even if the function returns later
+ * without setting candidate_xid due to inactivity on the subscriber. This
+ * avoids frequent calls to GetOldestActiveTransactionId.
+ */
+ rdt_data->candidate_xid_time = now;
+
+ /*
+ * Consider transactions in the current database, as only dead tuples from
+ * this database are required for conflict detection.
+ */
+ oldest_running_xid = GetOldestActiveTransactionId(false, false);
+
+ /*
+ * Oldest active transaction ID (oldest_running_xid) can't be behind any
+ * of its previously computed value.
+ */
+ Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ oldest_running_xid));
+
+ /* Return if the oldest_nonremovable_xid cannot be advanced */
+ if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+ oldest_running_xid))
+ {
+ adjust_xid_advance_interval(rdt_data, false);
+ return;
+ }
+
+ adjust_xid_advance_interval(rdt_data, true);
+
+ rdt_data->candidate_xid = oldest_running_xid;
+ rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
+ */
+static void
+request_publisher_status(RetainDeadTuplesData *rdt_data)
+{
+ static StringInfo request_message = NULL;
+
+ if (!request_message)
+ {
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ request_message = makeStringInfo();
+ MemoryContextSwitchTo(oldctx);
+ }
+ else
+ resetStringInfo(request_message);
+
+ /*
+ * Send the current time to update the remote walsender's latest reply
+ * message received time.
+ */
+ pq_sendbyte(request_message, 'p');
+ pq_sendint64(request_message, GetCurrentTimestamp());
+
+ elog(DEBUG2, "sending publisher status request message");
+
+ /* Send a request for the publisher status */
+ walrcv_send(LogRepWorkerWalRcvConn,
+ request_message->data, request_message->len);
+
+ rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
+
+ /*
+ * Skip calling maybe_advance_nonremovable_xid() since further transition
+ * is possible only once we receive the publisher status message.
+ */
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
+ */
+static void
+wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+ bool status_received)
+{
+ /*
+ * Return if we have requested but not yet received the publisher status.
+ */
+ if (!status_received)
+ return;
+
+ if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
+ rdt_data->remote_wait_for = rdt_data->remote_nextxid;
+
+ /*
+ * Check if all remote concurrent transactions that were active at the
+ * first status request have now completed. If completed, proceed to the
+ * next phase; otherwise, continue checking the publisher status until
+ * these transactions finish.
+ *
+ * It's possible that transactions in the commit phase during the last
+ * cycle have now finished committing, but remote_oldestxid remains older
+ * than remote_wait_for. This can happen if some old transaction came in
+ * the commit phase when we requested status in this cycle. We do not
+ * handle this case explicitly as it's rare and the benefit doesn't
+ * justify the required complexity. Tracking would require either caching
+ * all xids at the publisher or sending them to subscribers. The condition
+ * will resolve naturally once the remaining transactions are finished.
+ *
+ * Directly advancing the non-removable transaction ID is possible if
+ * there are no activities on the publisher since the last advancement
+ * cycle. However, it requires maintaining two fields, last_remote_nextxid
+ * and last_remote_lsn, within the structure for comparison with the
+ * current cycle's values. Considering the minimal cost of continuing in
+ * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
+ * advance the transaction ID here.
+ */
+ if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
+ rdt_data->remote_oldestxid))
+ rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
+ else
+ rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
+ */
+static void
+wait_for_local_flush(RetainDeadTuplesData *rdt_data)
+{
+ Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
+ TransactionIdIsValid(rdt_data->candidate_xid));
+
+ /*
+ * We expect the publisher and subscriber clocks to be in sync using time
+ * sync service like NTP. Otherwise, we will advance this worker's
+ * oldest_nonremovable_xid prematurely, leading to the removal of rows
+ * required to detect conflicts reliably. This check primarily addresses
+ * scenarios where the publisher's clock falls behind; if the publisher's
+ * clock is ahead, subsequent transactions will naturally bear later
+ * commit timestamps, conforming to the design outlined atop worker.c.
+ *
+ * XXX Consider waiting for the publisher's clock to catch up with the
+ * subscriber's before proceeding to the next phase.
+ */
+ if (TimestampDifferenceExceeds(rdt_data->reply_time,
+ rdt_data->candidate_xid_time, 0))
+ ereport(ERROR,
+ errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
+ errdetail_internal("The clock on the publisher is behind that of the subscriber."));
+
+ /*
+ * Do not attempt to advance the non-removable transaction ID when table
+ * sync is in progress. During this time, changes from a single
+ * transaction may be applied by multiple table sync workers corresponding
+ * to the target tables. So, it's necessary for all table sync workers to
+ * apply and flush the corresponding changes before advancing the
+ * transaction ID, otherwise, dead tuples that are still needed for
+ * conflict detection in table sync workers could be removed prematurely.
+ * However, confirming the apply and flush progress across all table sync
+ * workers is complex and not worth the effort, so we simply return if not
+ * all tables are in the READY state.
+ *
+ * It is safe to add new tables with initial states to the subscription
+ * after this check because any changes applied to these tables should
+ * have a WAL position greater than the rdt_data->remote_lsn.
+ */
+ if (!AllTablesyncsReady())
+ return;
+
+ /*
+ * Update and check the remote flush position if we are applying changes
+ * in a loop. This is done at most once per WalWriterDelay to avoid
+ * performing costly operations in get_flush_position() too frequently
+ * during change application.
+ */
+ if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
+ TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
+ rdt_data->last_recv_time, WalWriterDelay))
+ {
+ XLogRecPtr writepos;
+ XLogRecPtr flushpos;
+ bool have_pending_txes;
+
+ /* Fetch the latest remote flush position */
+ get_flush_position(&writepos, &flushpos, &have_pending_txes);
+
+ if (flushpos > last_flushpos)
+ last_flushpos = flushpos;
+
+ rdt_data->flushpos_update_time = rdt_data->last_recv_time;
+ }
+
+ /* Return to wait for the changes to be applied */
+ if (last_flushpos < rdt_data->remote_lsn)
+ return;
+
+ /*
+ * Reaching here means the remote WAL position has been received, and all
+ * transactions up to that position on the publisher have been applied and
+ * flushed locally. So, we can advance the non-removable transaction ID.
+ */
+ SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+ MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
+ SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+ elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
+ LSN_FORMAT_ARGS(rdt_data->remote_lsn),
+ rdt_data->candidate_xid);
+
+ /* Notify launcher to update the xmin of the conflict slot */
+ ApplyLauncherWakeup();
+
+ /*
+ * Reset all data fields except those used to determine the timing for the
+ * next round of transaction ID advancement. We can even use
+ * flushpos_update_time in the next round to decide whether to get the
+ * latest flush position.
+ */
+ rdt_data->phase = RDT_GET_CANDIDATE_XID;
+ rdt_data->remote_lsn = InvalidXLogRecPtr;
+ rdt_data->remote_oldestxid = InvalidFullTransactionId;
+ rdt_data->remote_nextxid = InvalidFullTransactionId;
+ rdt_data->reply_time = 0;
+ rdt_data->remote_wait_for = InvalidFullTransactionId;
+ rdt_data->candidate_xid = InvalidTransactionId;
+
+ /* process the next phase */
+ process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Adjust the interval for advancing non-removable transaction IDs.
+ *
+ * We double the interval to try advancing the non-removable transaction IDs
+ * if there is no activity on the node. The maximum value of the interval is
+ * capped by wal_receiver_status_interval if it is not zero, otherwise to a
+ * 3 minutes which should be sufficient to avoid using CPU or network
+ * resources without much benefit.
+ *
+ * The interval is reset to a minimum value of 100ms once there is some
+ * activity on the node.
+ *
+ * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
+ * consider the other interval or a separate GUC if the need arises.
+ */
+static void
+adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
+{
+ if (!new_xid_found && rdt_data->xid_advance_interval)
+ {
+ int max_interval = wal_receiver_status_interval
+ ? wal_receiver_status_interval * 1000
+ : MAX_XID_ADVANCE_INTERVAL;
+
+ /*
+ * No new transaction ID has been assigned since the last check, so
+ * double the interval, but not beyond the maximum allowable value.
+ */
+ rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
+ max_interval);
+ }
+ else
+ {
+ /*
+ * A new transaction ID was found or the interval is not yet
+ * initialized, so set the interval to the minimum value.
+ */
+ rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
+ }
+}
+
+/*
* Exit routine for apply workers due to subscription parameter changes.
*/
static void
@@ -4708,6 +5297,30 @@ InitializeLogRepWorker(void)
apply_worker_exit();
}
+ /*
+ * Restart the worker if retain_dead_tuples was enabled during startup.
+ *
+ * At this point, the replication slot used for conflict detection might
+ * not exist yet, or could be dropped soon if the launcher perceives
+ * retain_dead_tuples as disabled. To avoid unnecessary tracking of
+ * oldest_nonremovable_xid when the slot is absent or at risk of being
+ * dropped, a restart is initiated.
+ *
+ * The oldest_nonremovable_xid should be initialized only when the
+ * retain_dead_tuples is enabled before launching the worker. See
+ * logicalrep_worker_launch.
+ */
+ if (am_leader_apply_worker() &&
+ MySubscription->retaindeadtuples &&
+ !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
+ {
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
+ MySubscription->name, "retain_dead_tuples"));
+
+ apply_worker_exit();
+ }
+
/* Setup synchronous commit according to the user's wishes */
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
@@ -4864,6 +5477,14 @@ DisableSubscriptionAndExit(void)
errmsg("subscription \"%s\" has been disabled because of an error",
MySubscription->name));
+ /*
+ * Skip the track_commit_timestamp check when disabling the worker due to
+ * an error, as verifying commit timestamps is unnecessary in this
+ * context.
+ */
+ if (MySubscription->retaindeadtuples)
+ CheckSubDeadTupleRetention(false, true, WARNING);
+
proc_exit(0);
}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e44ad576bc7..8605776ad86 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -47,6 +47,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
+#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
@@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
static void ReplicationSlotShmemExit(int code, Datum arg);
+static bool IsSlotForConflictCheck(const char *name);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
/* internal persistency functions */
@@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg)
/*
* Check whether the passed slot name is valid and report errors at elevel.
*
+ * An error will be reported for a reserved replication slot name if
+ * allow_reserved_name is set to false.
+ *
* Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
* the name to be used as a directory name on every supported OS.
*
* Returns whether the directory name is valid or not if elevel < ERROR.
*/
bool
-ReplicationSlotValidateName(const char *name, int elevel)
+ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
+ int elevel)
{
const char *cp;
@@ -300,10 +306,32 @@ ReplicationSlotValidateName(const char *name, int elevel)
return false;
}
}
+
+ if (!allow_reserved_name && IsSlotForConflictCheck(name))
+ {
+ ereport(elevel,
+ errcode(ERRCODE_RESERVED_NAME),
+ errmsg("replication slot name \"%s\" is reserved",
+ name),
+ errdetail("The name \"%s\" is reserved for the conflict detection slot.",
+ CONFLICT_DETECTION_SLOT));
+
+ return false;
+ }
+
return true;
}
/*
+ * Return true if the replication slot name is "pg_conflict_detection".
+ */
+static bool
+IsSlotForConflictCheck(const char *name)
+{
+ return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
+}
+
+/*
* Create a new replication slot and mark it as used by this backend.
*
* name: Name of the slot
@@ -330,7 +358,12 @@ ReplicationSlotCreate(const char *name, bool db_specific,
Assert(MyReplicationSlot == NULL);
- ReplicationSlotValidateName(name, ERROR);
+ /*
+ * The logical launcher or pg_upgrade may create or migrate an internal
+ * slot, so using a reserved name is allowed in these cases.
+ */
+ ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
+ ERROR);
if (failover)
{
@@ -582,6 +615,17 @@ retry:
}
/*
+ * Do not allow users to acquire the reserved slot. This scenario may
+ * occur if the launcher that owns the slot has terminated unexpectedly
+ * due to an error, and a backend process attempts to reuse the slot.
+ */
+ if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
+ ereport(ERROR,
+ errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("cannot acquire replication slot \"%s\"", name),
+ errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
+
+ /*
* This is the slot we want; check if it's active under some other
* process. In single user mode, we don't need this check.
*/
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 28b8591efa5..4c72a0d43b3 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -84,6 +84,7 @@
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
+#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
@@ -258,6 +259,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
+static void ProcessStandbyPSRequestMessage(void);
static void ProcessRepliesIfAny(void);
static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
@@ -2355,6 +2357,10 @@ ProcessStandbyMessage(void)
ProcessStandbyHSFeedbackMessage();
break;
+ case 'p':
+ ProcessStandbyPSRequestMessage();
+ break;
+
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2702,6 +2708,60 @@ ProcessStandbyHSFeedbackMessage(void)
}
/*
+ * Process the request for a primary status update message.
+ */
+static void
+ProcessStandbyPSRequestMessage(void)
+{
+ XLogRecPtr lsn = InvalidXLogRecPtr;
+ TransactionId oldestXidInCommit;
+ FullTransactionId nextFullXid;
+ FullTransactionId fullOldestXidInCommit;
+ WalSnd *walsnd = MyWalSnd;
+ TimestampTz replyTime;
+
+ /*
+ * This shouldn't happen because we don't support getting primary status
+ * message from standby.
+ */
+ if (RecoveryInProgress())
+ elog(ERROR, "the primary status is unavailable during recovery");
+
+ replyTime = pq_getmsgint64(&reply_message);
+
+ /*
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
+ */
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->replyTime = replyTime;
+ SpinLockRelease(&walsnd->mutex);
+
+ /*
+ * Consider transactions in the current database, as only these are the
+ * ones replicated.
+ */
+ oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+ nextFullXid = ReadNextFullTransactionId();
+ fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
+ oldestXidInCommit);
+ lsn = GetXLogWriteRecPtr();
+
+ elog(DEBUG2, "sending primary status");
+
+ /* construct the message... */
+ resetStringInfo(&output_message);
+ pq_sendbyte(&output_message, 's');
+ pq_sendint64(&output_message, lsn);
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
+ pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
+ pq_sendint64(&output_message, GetCurrentTimestamp());
+
+ /* ... and send it wrapped in CopyData */
+ pq_putmessage_noblock('d', output_message.data, output_message.len);
+}
+
+/*
* Compute how long send/receive loops should sleep.
*
* If wal_sender_timeout is enabled we want to wake up in time to send
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 2418967def6..bf987aed8d3 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2814,8 +2814,10 @@ GetRunningTransactionData(void)
*
* Similar to GetSnapshotData but returns just oldestActiveXid. We include
* all PGPROCs with an assigned TransactionId, even VACUUM processes.
- * We look at all databases, though there is no need to include WALSender
- * since this has no effect on hot standby conflicts.
+ *
+ * If allDbs is true, we look at all databases, though there is no need to
+ * include WALSender since this has no effect on hot standby conflicts. If
+ * allDbs is false, skip processes attached to other databases.
*
* This is never executed during recovery so there is no need to look at
* KnownAssignedXids.
@@ -2823,9 +2825,12 @@ GetRunningTransactionData(void)
* We don't worry about updating other counters, we want to keep this as
* simple as possible and leave GetSnapshotData() as the primary code for
* that bookkeeping.
+ *
+ * inCommitOnly indicates getting the oldestActiveXid among the transactions
+ * in the commit critical section.
*/
TransactionId
-GetOldestActiveTransactionId(void)
+GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
{
ProcArrayStruct *arrayP = procArray;
TransactionId *other_xids = ProcGlobal->xids;
@@ -2852,6 +2857,8 @@ GetOldestActiveTransactionId(void)
for (index = 0; index < arrayP->numProcs; index++)
{
TransactionId xid;
+ int pgprocno = arrayP->pgprocnos[index];
+ PGPROC *proc = &allProcs[pgprocno];
/* Fetch xid just once - see GetNewTransactionId */
xid = UINT32_ACCESS_ONCE(other_xids[index]);
@@ -2859,6 +2866,13 @@ GetOldestActiveTransactionId(void)
if (!TransactionIdIsNormal(xid))
continue;
+ if (inCommitOnly &&
+ (proc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
+ continue;
+
+ if (!allDbs && proc->databaseId != MyDatabaseId)
+ continue;
+
if (TransactionIdPrecedes(xid, oldestRunningXid))
oldestRunningXid = xid;
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index d44f8c262ba..a4f8b4faa90 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -21,6 +21,7 @@
#include "commands/extension.h"
#include "miscadmin.h"
#include "replication/logical.h"
+#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
@@ -410,3 +411,21 @@ binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+
+/*
+ * binary_upgrade_create_conflict_detection_slot
+ *
+ * Create a replication slot to retain information necessary for conflict
+ * detection such as dead tuples, commit timestamps, and origins.
+ */
+Datum
+binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS)
+{
+ CHECK_IS_BINARY_UPGRADE;
+
+ CreateConflictDetectionSlot();
+
+ ReplicationSlotRelease();
+
+ PG_RETURN_VOID();
+}