Skip to content

Commit d0081c7

Browse files
Zhijie HouCommitfest Bot
Zhijie Hou
authored and
Commitfest Bot
committed
Introduce a new GUC 'max_conflict_retention_duration'
This commit introduces a GUC option max_conflict_retention_duration, designed to prevent excessive accumulation of dead tuples when subscription with retain_conflict_info enabled is present and the the apply worker cannot catch up with the publisher's workload. If the time spent advancing non-removable transaction ID surpasses the max_conflict_retention_duration threshold, the apply worker would stop retaining information for conflict detection. The replication slot pg_conflict_detection will be invalidated if all apply workers associated with the subscription, where retain_conflict_info is enabled, confirm that the retention duration exceeded the max_conflict_retention_duration. In this patch, a replication slot will not be automatically re-created if it becomes invalidated. Users can disable retain_conflict_info and re-enable it after confirming that the replication slot has been dropped. An upcoming patch will include support for automatic slot recreation once at least one apply worker confirms that the retention duration is within the max_conflict_retention_duration limit. To monitor worker's conflict retention status, this patch also introduces a new column 'retain_conflict_info' in the pg_stat_subscription view. This column indicates whether the apply worker is effectively retaining conflict information. The value is set to true only if retain_conflict_info is enabled for the associated subscription, and the retention duration for conflict detection by the apply worker has not exceeded max_conflict_retention_duration.
1 parent fd28cc2 commit d0081c7

File tree

14 files changed

+337
-26
lines changed

14 files changed

+337
-26
lines changed

doc/src/sgml/config.sgml

+41
Original file line numberDiff line numberDiff line change
@@ -5379,6 +5379,47 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
53795379
</listitem>
53805380
</varlistentry>
53815381

5382+
<varlistentry id="guc-max-conflict-retention-duration" xreflabel="max_conflict_retention_duration">
5383+
<term><varname>max_conflict_retention_duration</varname> (<type>integer</type>)
5384+
<indexterm>
5385+
<primary><varname>max_conflict_retention_duration</varname> configuration parameter</primary>
5386+
</indexterm>
5387+
</term>
5388+
<listitem>
5389+
<para>
5390+
Maximum duration (in milliseconds) for which conflict
5391+
information can be retained for conflict detection by the apply worker.
5392+
The default value is <literal>0</literal>, indicating that conflict
5393+
information is retained until it is no longer needed for detection
5394+
purposes.
5395+
</para>
5396+
<para>
5397+
The replication slot
5398+
<quote><literal>pg_conflict_detection</literal></quote> that used to
5399+
retain conflict information will be invalidated if all apply workers
5400+
associated with the subscriptions, where
5401+
<literal>retain_conflict_info</literal> is enabled, confirm that the
5402+
retention duration exceeded the
5403+
<literal>max_conflict_retention_duration</literal>. If the replication
5404+
slot is invalidated, you can disable
5405+
<literal>retain_conflict_info</literal> and re-enable it after
5406+
confirming this replication slot has been dropped.
5407+
</para>
5408+
<para>
5409+
This option is effective only if a subscription with
5410+
<literal>retain_conflict_info</literal> enabled is present, and the
5411+
associated apply worker is active.
5412+
</para>
5413+
<warning>
5414+
<para>
5415+
Note that setting a non-zero value for this option could lead to
5416+
conflict information being removed prematurely, potentially missing
5417+
some conflict detections.
5418+
</para>
5419+
</warning>
5420+
</listitem>
5421+
</varlistentry>
5422+
53825423
</variablelist>
53835424
</sect2>
53845425

doc/src/sgml/monitoring.sgml

+13
Original file line numberDiff line numberDiff line change
@@ -2109,6 +2109,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage
21092109
sender; NULL for parallel apply workers
21102110
</para></entry>
21112111
</row>
2112+
2113+
<row>
2114+
<entry role="catalog_table_entry"><para role="column_definition">
2115+
<structfield>retain_conflict_info</structfield> <type>boolean</type>
2116+
</para>
2117+
<para>
2118+
True if <link linkend="sql-createsubscription-params-with-retain-conflict-info"><literal>retain_conflict_info</literal></link>
2119+
is enabled and the duration for which conflict information is
2120+
retained for conflict detection by this apply worker does not exceed
2121+
<link linkend="guc-max-conflict-retention-duration"><literal>max_conflict_retention_duration</literal></link>; NULL for
2122+
parallel apply workers and table synchronization workers.
2123+
</para></entry>
2124+
</row>
21122125
</tbody>
21132126
</tgroup>
21142127
</table>

doc/src/sgml/system-views.sgml

+11
Original file line numberDiff line numberDiff line change
@@ -2936,6 +2936,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
29362936
<xref linkend="guc-idle-replication-slot-timeout"/> duration.
29372937
</para>
29382938
</listitem>
2939+
<listitem>
2940+
<para>
2941+
<literal>conflict_retention_exceeds_max_duration</literal> means that
2942+
the duration for retaining conflict information, which is used
2943+
in logical replication conflict detection, has exceeded the maximum
2944+
allowable limit. It is set only for the slot
2945+
<literal>pg_conflict_detection</literal>, which is created when
2946+
<link linkend="sql-createsubscription-params-with-retain-conflict-info"><literal>retain_conflict_info</literal></link>
2947+
is enabled.
2948+
</para>
2949+
</listitem>
29392950
</itemizedlist>
29402951
</para></entry>
29412952
</row>

src/backend/catalog/system_views.sql

+2-1
Original file line numberDiff line numberDiff line change
@@ -993,7 +993,8 @@ CREATE VIEW pg_stat_subscription AS
993993
st.last_msg_send_time,
994994
st.last_msg_receipt_time,
995995
st.latest_end_lsn,
996-
st.latest_end_time
996+
st.latest_end_time,
997+
st.retain_conflict_info
997998
FROM pg_subscription su
998999
LEFT JOIN pg_stat_get_subscription(NULL) st
9991000
ON (st.subid = su.oid);

src/backend/replication/logical/launcher.c

+72-8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "access/tableam.h"
2424
#include "access/xact.h"
2525
#include "catalog/pg_subscription.h"
26+
#include "catalog/pg_subscription_d.h"
2627
#include "catalog/pg_subscription_rel.h"
2728
#include "funcapi.h"
2829
#include "lib/dshash.h"
@@ -43,6 +44,7 @@
4344
#include "utils/memutils.h"
4445
#include "utils/pg_lsn.h"
4546
#include "utils/snapmgr.h"
47+
#include "utils/syscache.h"
4648

4749
/* max sleep time between cycles (3min) */
4850
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
@@ -51,6 +53,7 @@
5153
int max_logical_replication_workers = 4;
5254
int max_sync_workers_per_subscription = 2;
5355
int max_parallel_apply_workers_per_subscription = 2;
56+
int max_conflict_retention_duration = 0;
5457

5558
LogicalRepWorker *MyLogicalRepWorker = NULL;
5659

@@ -446,6 +449,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
446449
worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
447450
worker->parallel_apply = is_parallel_apply_worker;
448451
worker->oldest_nonremovable_xid = InvalidFullTransactionId;
452+
worker->stop_conflict_info_retention = false;
449453
worker->last_lsn = InvalidXLogRecPtr;
450454
TIMESTAMP_NOBEGIN(worker->last_send_time);
451455
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -1157,7 +1161,8 @@ ApplyLauncherMain(Datum main_arg)
11571161
MemoryContext oldctx;
11581162
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
11591163
bool can_advance_xmin = true;
1160-
bool retain_conflict_info = false;
1164+
int nretain_conflict_info = 0;
1165+
int nstop_retention = 0;
11611166
FullTransactionId xmin = InvalidFullTransactionId;
11621167

11631168
CHECK_FOR_INTERRUPTS();
@@ -1186,7 +1191,7 @@ ApplyLauncherMain(Datum main_arg)
11861191
*/
11871192
if (sub->retainconflictinfo)
11881193
{
1189-
retain_conflict_info = true;
1194+
nretain_conflict_info++;
11901195
can_advance_xmin &= sub->enabled;
11911196

11921197
/*
@@ -1212,22 +1217,32 @@ ApplyLauncherMain(Datum main_arg)
12121217
* the new xmin for advancing the replication slot used in
12131218
* conflict detection.
12141219
*/
1215-
if (sub->retainconflictinfo && can_advance_xmin)
1220+
if (sub->retainconflictinfo)
12161221
{
12171222
FullTransactionId nonremovable_xid;
1223+
bool stop_conflict_info_retention;
12181224

12191225
SpinLockAcquire(&w->relmutex);
12201226
nonremovable_xid = w->oldest_nonremovable_xid;
1227+
stop_conflict_info_retention = w->stop_conflict_info_retention;
12211228
SpinLockRelease(&w->relmutex);
12221229

1230+
/*
1231+
* Skip collecting oldest_nonremovable_xid for workers
1232+
* that have stopped conflict retention.
1233+
*/
1234+
if (stop_conflict_info_retention)
1235+
nstop_retention++;
1236+
12231237
/*
12241238
* Stop advancing xmin if an invalid non-removable
12251239
* transaction ID is found, otherwise update xmin.
12261240
*/
1227-
if (!FullTransactionIdIsValid(nonremovable_xid))
1241+
else if (!FullTransactionIdIsValid(nonremovable_xid))
12281242
can_advance_xmin = false;
1229-
else if (!FullTransactionIdIsValid(xmin) ||
1230-
FullTransactionIdPrecedes(nonremovable_xid, xmin))
1243+
else if (can_advance_xmin &&
1244+
(!FullTransactionIdIsValid(xmin) ||
1245+
FullTransactionIdPrecedes(nonremovable_xid, xmin)))
12311246
xmin = nonremovable_xid;
12321247
}
12331248

@@ -1272,11 +1287,34 @@ ApplyLauncherMain(Datum main_arg)
12721287
}
12731288
}
12741289

1290+
/*
1291+
* Do nothing if the replication slot is invalidated due to conflict
1292+
* retention duration.
1293+
*/
1294+
if (nretain_conflict_info &&
1295+
MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
1296+
{
1297+
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_CONFLICT_RETENTION_DURATION);
1298+
}
1299+
1300+
/*
1301+
* Invalidate the conflict slot if all workers with
1302+
* retain_conflict_info enabled have stopped further conflict
1303+
* retention.
1304+
*/
1305+
else if (nstop_retention && nretain_conflict_info == nstop_retention)
1306+
{
1307+
ReplicationSlotRelease();
1308+
InvalidateObsoleteReplicationSlots(RS_INVAL_CONFLICT_RETENTION_DURATION,
1309+
InvalidXLogRecPtr, InvalidOid,
1310+
InvalidTransactionId);
1311+
}
1312+
12751313
/*
12761314
* Maintain the xmin value of the replication slot for conflict
12771315
* detection if needed.
12781316
*/
1279-
if (retain_conflict_info)
1317+
else if (nretain_conflict_info)
12801318
{
12811319
if (can_advance_xmin)
12821320
advance_conflict_slot_xmin(xmin);
@@ -1470,7 +1508,7 @@ GetLeaderApplyWorkerPid(pid_t pid)
14701508
Datum
14711509
pg_stat_get_subscription(PG_FUNCTION_ARGS)
14721510
{
1473-
#define PG_STAT_GET_SUBSCRIPTION_COLS 10
1511+
#define PG_STAT_GET_SUBSCRIPTION_COLS 11
14741512
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
14751513
int i;
14761514
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
@@ -1547,6 +1585,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
15471585
elog(ERROR, "unknown worker type");
15481586
}
15491587

1588+
/*
1589+
* Only the leader apply worker manages conflict retention (see
1590+
* maybe_advance_nonremovable_xid() for details).
1591+
*/
1592+
if (!isParallelApplyWorker(&worker) && !isTablesyncWorker(&worker))
1593+
{
1594+
HeapTuple tup;
1595+
Form_pg_subscription subform;
1596+
1597+
tup = SearchSysCache1(SUBSCRIPTIONOID,
1598+
ObjectIdGetDatum(worker.subid));
1599+
1600+
if (!HeapTupleIsValid(tup))
1601+
elog(ERROR, "cache lookup failed for subscription %u",
1602+
worker.subid);
1603+
1604+
subform = (Form_pg_subscription) GETSTRUCT(tup);
1605+
1606+
values[10] = subform->subretainconflictinfo &&
1607+
!worker.stop_conflict_info_retention;
1608+
1609+
ReleaseSysCache(tup);
1610+
}
1611+
else
1612+
nulls[10] = true;
1613+
15501614
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
15511615
values, nulls);
15521616

0 commit comments

Comments
 (0)