14
14
15
15
#include "postgres.h"
16
16
17
+ #include "access/commit_ts.h"
17
18
#include "access/htup_details.h"
18
19
#include "access/table.h"
19
20
#include "access/twophase.h"
71
72
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
72
73
#define SUBOPT_RUN_AS_OWNER 0x00001000
73
74
#define SUBOPT_FAILOVER 0x00002000
74
- #define SUBOPT_LSN 0x00004000
75
- #define SUBOPT_ORIGIN 0x00008000
75
+ #define SUBOPT_RETAIN_CONFLICT_INFO 0x00004000
76
+ #define SUBOPT_LSN 0x00008000
77
+ #define SUBOPT_ORIGIN 0x00010000
76
78
77
79
/* check if the 'val' has 'bits' set */
78
80
#define IsSet (val , bits ) (((val) & (bits)) == (bits))
@@ -98,6 +100,7 @@ typedef struct SubOpts
98
100
bool passwordrequired ;
99
101
bool runasowner ;
100
102
bool failover ;
103
+ bool retainconflictinfo ;
101
104
char * origin ;
102
105
XLogRecPtr lsn ;
103
106
} SubOpts ;
@@ -107,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn,
107
110
List * publications , bool copydata ,
108
111
char * origin , Oid * subrel_local_oids ,
109
112
int subrel_count , char * subname );
113
+ static void check_conflict_info_retaintion (WalReceiverConn * wrconn ,
114
+ bool retain_conflict_info );
110
115
static void check_duplicates_in_publist (List * publist , Datum * datums );
111
116
static List * merge_publications (List * oldpublist , List * newpublist , bool addpub , const char * subname );
112
117
static void ReportSlotConnectionError (List * rstates , Oid subid , char * slotname , char * err );
@@ -162,6 +167,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
162
167
opts -> runasowner = false;
163
168
if (IsSet (supported_opts , SUBOPT_FAILOVER ))
164
169
opts -> failover = false;
170
+ if (IsSet (supported_opts , SUBOPT_RETAIN_CONFLICT_INFO ))
171
+ opts -> retainconflictinfo = false;
165
172
if (IsSet (supported_opts , SUBOPT_ORIGIN ))
166
173
opts -> origin = pstrdup (LOGICALREP_ORIGIN_ANY );
167
174
@@ -307,6 +314,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
307
314
opts -> specified_opts |= SUBOPT_FAILOVER ;
308
315
opts -> failover = defGetBoolean (defel );
309
316
}
317
+ else if (IsSet (supported_opts , SUBOPT_RETAIN_CONFLICT_INFO ) &&
318
+ strcmp (defel -> defname , "retain_conflict_info" ) == 0 )
319
+ {
320
+ if (IsSet (opts -> specified_opts , SUBOPT_RETAIN_CONFLICT_INFO ))
321
+ errorConflictingDefElem (defel , pstate );
322
+
323
+ opts -> specified_opts |= SUBOPT_RETAIN_CONFLICT_INFO ;
324
+ opts -> retainconflictinfo = defGetBoolean (defel );
325
+ }
310
326
else if (IsSet (supported_opts , SUBOPT_ORIGIN ) &&
311
327
strcmp (defel -> defname , "origin" ) == 0 )
312
328
{
@@ -563,7 +579,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
563
579
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
564
580
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
565
581
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
566
- SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN );
582
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
583
+ SUBOPT_RETAIN_CONFLICT_INFO | SUBOPT_ORIGIN );
567
584
parse_subscription_options (pstate , stmt -> options , supported_opts , & opts );
568
585
569
586
/*
@@ -608,6 +625,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
608
625
errmsg ("password_required=false is superuser-only" ),
609
626
errhint ("Subscriptions with the password_required option set to false may only be created or modified by the superuser." )));
610
627
628
+ if (opts .retainconflictinfo && !track_commit_timestamp )
629
+ ereport (WARNING ,
630
+ errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
631
+ errmsg ("information for detecting conflicts cannot be fully retained when \"%s\" is disabled" ,
632
+ "track_commit_timestamp" ));
633
+
611
634
/*
612
635
* If built with appropriate switch, whine when regression-testing
613
636
* conventions for subscription names are violated.
@@ -670,6 +693,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
670
693
values [Anum_pg_subscription_subpasswordrequired - 1 ] = BoolGetDatum (opts .passwordrequired );
671
694
values [Anum_pg_subscription_subrunasowner - 1 ] = BoolGetDatum (opts .runasowner );
672
695
values [Anum_pg_subscription_subfailover - 1 ] = BoolGetDatum (opts .failover );
696
+ values [Anum_pg_subscription_subretainconflictinfo - 1 ] =
697
+ BoolGetDatum (opts .retainconflictinfo );
673
698
values [Anum_pg_subscription_subconninfo - 1 ] =
674
699
CStringGetTextDatum (conninfo );
675
700
if (opts .slot_name )
@@ -724,6 +749,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
724
749
check_publications_origin (wrconn , publications , opts .copy_data ,
725
750
opts .origin , NULL , 0 , stmt -> subname );
726
751
752
+ check_conflict_info_retaintion (wrconn , opts .retainconflictinfo );
753
+
727
754
/*
728
755
* Set sync state based on if we were asked to do data copy or
729
756
* not.
@@ -1110,6 +1137,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1110
1137
bool update_tuple = false;
1111
1138
bool update_failover = false;
1112
1139
bool update_two_phase = false;
1140
+ bool retain_conflict_info = false;
1113
1141
Subscription * sub ;
1114
1142
Form_pg_subscription form ;
1115
1143
bits32 supported_opts ;
@@ -1165,7 +1193,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1165
1193
SUBOPT_DISABLE_ON_ERR |
1166
1194
SUBOPT_PASSWORD_REQUIRED |
1167
1195
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
1168
- SUBOPT_ORIGIN );
1196
+ SUBOPT_RETAIN_CONFLICT_INFO | SUBOPT_ORIGIN );
1169
1197
1170
1198
parse_subscription_options (pstate , stmt -> options ,
1171
1199
supported_opts , & opts );
@@ -1325,6 +1353,29 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1325
1353
replaces [Anum_pg_subscription_subfailover - 1 ] = true;
1326
1354
}
1327
1355
1356
+ if (IsSet (opts .specified_opts , SUBOPT_RETAIN_CONFLICT_INFO ))
1357
+ {
1358
+ values [Anum_pg_subscription_subretainconflictinfo - 1 ] =
1359
+ BoolGetDatum (opts .retainconflictinfo );
1360
+ replaces [Anum_pg_subscription_subretainconflictinfo - 1 ] = true;
1361
+
1362
+ if (opts .retainconflictinfo && !track_commit_timestamp )
1363
+ ereport (WARNING ,
1364
+ errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
1365
+ errmsg ("information for detecting conflicts cannot be fully retained when \"%s\" is disabled" ,
1366
+ "track_commit_timestamp" ));
1367
+
1368
+ /*
1369
+ * Notify the launcher to manage the replication slot for
1370
+ * conflict detection. This ensures that replication slot
1371
+ * is efficiently handled (created, updated, or dropped)
1372
+ * in response to any configuration changes.
1373
+ */
1374
+ ApplyLauncherWakeupAtCommit ();
1375
+
1376
+ retain_conflict_info = opts .retainconflictinfo ;
1377
+ }
1378
+
1328
1379
if (IsSet (opts .specified_opts , SUBOPT_ORIGIN ))
1329
1380
{
1330
1381
values [Anum_pg_subscription_suborigin - 1 ] =
@@ -1355,6 +1406,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1355
1406
ApplyLauncherWakeupAtCommit ();
1356
1407
1357
1408
update_tuple = true;
1409
+
1410
+ /*
1411
+ * The subscription might be initially created with
1412
+ * connect=false and retain_conflict_info=true, meaning the
1413
+ * remote server's status may not be checked. Ensure this
1414
+ * check is conducted now.
1415
+ */
1416
+ retain_conflict_info = sub -> retainconflictinfo ;
1358
1417
break ;
1359
1418
}
1360
1419
@@ -1369,6 +1428,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1369
1428
CStringGetTextDatum (stmt -> conninfo );
1370
1429
replaces [Anum_pg_subscription_subconninfo - 1 ] = true;
1371
1430
update_tuple = true;
1431
+
1432
+ /*
1433
+ * Since the remote server configuration might have changed,
1434
+ * perform a check to ensure it permits enabling
1435
+ * retain_conflict_info.
1436
+ */
1437
+ retain_conflict_info = sub -> retainconflictinfo ;
1372
1438
break ;
1373
1439
1374
1440
case ALTER_SUBSCRIPTION_SET_PUBLICATION :
@@ -1568,14 +1634,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1568
1634
}
1569
1635
1570
1636
/*
1571
- * Try to acquire the connection necessary for altering the slot, if
1572
- * needed.
1637
+ * Try to acquire the connection necessary either for modifying the slot
1638
+ * or for checking if the remote server permits enabling
1639
+ * retain_conflict_info.
1573
1640
*
1574
1641
* This has to be at the end because otherwise if there is an error while
1575
1642
* doing the database operations we won't be able to rollback altered
1576
1643
* slot.
1577
1644
*/
1578
- if (update_failover || update_two_phase )
1645
+ if (update_failover || update_two_phase || retain_conflict_info )
1579
1646
{
1580
1647
bool must_use_password ;
1581
1648
char * err ;
@@ -1584,10 +1651,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1584
1651
/* Load the library providing us libpq calls. */
1585
1652
load_file ("libpqwalreceiver" , false);
1586
1653
1587
- /* Try to connect to the publisher. */
1654
+ /*
1655
+ * Try to connect to the publisher, using the new connection string if
1656
+ * available.
1657
+ */
1588
1658
must_use_password = sub -> passwordrequired && !sub -> ownersuperuser ;
1589
- wrconn = walrcv_connect (sub -> conninfo , true, true, must_use_password ,
1590
- sub -> name , & err );
1659
+ wrconn = walrcv_connect (stmt -> conninfo ? stmt -> conninfo : sub -> conninfo ,
1660
+ true, true, must_use_password , sub -> name ,
1661
+ & err );
1591
1662
if (!wrconn )
1592
1663
ereport (ERROR ,
1593
1664
(errcode (ERRCODE_CONNECTION_FAILURE ),
@@ -1596,9 +1667,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
1596
1667
1597
1668
PG_TRY ();
1598
1669
{
1599
- walrcv_alter_slot (wrconn , sub -> slotname ,
1600
- update_failover ? & opts .failover : NULL ,
1601
- update_two_phase ? & opts .twophase : NULL );
1670
+ check_conflict_info_retaintion (wrconn , retain_conflict_info );
1671
+
1672
+ if (update_failover || update_two_phase )
1673
+ walrcv_alter_slot (wrconn , sub -> slotname ,
1674
+ update_failover ? & opts .failover : NULL ,
1675
+ update_two_phase ? & opts .twophase : NULL );
1602
1676
}
1603
1677
PG_FINALLY ();
1604
1678
{
@@ -2196,6 +2270,57 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications,
2196
2270
walrcv_clear_result (res );
2197
2271
}
2198
2272
2273
+ /*
2274
+ * Check if the publisher's status permits enabling retain_conflict_info.
2275
+ *
2276
+ * Enabling retain_conflict_info is not allowed if the publisher's version is
2277
+ * prior to PG18 or if the publisher is in recovery (operating as a standby
2278
+ * server).
2279
+ *
2280
+ * Refer to the comments atop maybe_advance_nonremovable_xid() for detailed
2281
+ * reasons.
2282
+ */
2283
+ static void
2284
+ check_conflict_info_retaintion (WalReceiverConn * wrconn , bool retain_conflict_info )
2285
+ {
2286
+ WalRcvExecResult * res ;
2287
+ Oid RecoveryRow [1 ] = {BOOLOID };
2288
+ TupleTableSlot * slot ;
2289
+ bool isnull ;
2290
+ bool remote_in_recovery ;
2291
+
2292
+ if (!retain_conflict_info )
2293
+ return ;
2294
+
2295
+ if (walrcv_server_version (wrconn ) < 18000 )
2296
+ ereport (ERROR ,
2297
+ errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
2298
+ errmsg ("cannot enable retain_conflict_info if the publisher is running a version earlier than PostgreSQL 18." ));
2299
+
2300
+ res = walrcv_exec (wrconn , "SELECT pg_is_in_recovery()" , 1 , RecoveryRow );
2301
+
2302
+ if (res -> status != WALRCV_OK_TUPLES )
2303
+ ereport (ERROR ,
2304
+ (errcode (ERRCODE_CONNECTION_FAILURE ),
2305
+ errmsg ("could not obtain recovery progress from the publisher: %s" ,
2306
+ res -> err )));
2307
+
2308
+ slot = MakeSingleTupleTableSlot (res -> tupledesc , & TTSOpsMinimalTuple );
2309
+ if (!tuplestore_gettupleslot (res -> tuplestore , true, false, slot ))
2310
+ elog (ERROR , "failed to fetch tuple for the recovery progress" );
2311
+
2312
+ remote_in_recovery = DatumGetBool (slot_getattr (slot , 1 , & isnull ));
2313
+
2314
+ if (remote_in_recovery )
2315
+ ereport (ERROR ,
2316
+ errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
2317
+ errmsg ("cannot enable retain_conflict_info if the publisher is in recovery." ));
2318
+
2319
+ ExecDropSingleTupleTableSlot (slot );
2320
+
2321
+ walrcv_clear_result (res );
2322
+ }
2323
+
2199
2324
/*
2200
2325
* Get the list of tables which belong to specified publications on the
2201
2326
* publisher connection.
0 commit comments