@@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
424
424
slot -> candidate_restart_valid = InvalidXLogRecPtr ;
425
425
slot -> candidate_restart_lsn = InvalidXLogRecPtr ;
426
426
slot -> last_saved_confirmed_flush = InvalidXLogRecPtr ;
427
+ slot -> restart_lsn_flushed = InvalidXLogRecPtr ;
427
428
slot -> inactive_since = 0 ;
428
429
429
430
/*
@@ -1165,20 +1166,36 @@ ReplicationSlotsComputeRequiredLSN(void)
1165
1166
{
1166
1167
ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1167
1168
XLogRecPtr restart_lsn ;
1169
+ XLogRecPtr restart_lsn_flushed ;
1168
1170
bool invalidated ;
1171
+ ReplicationSlotPersistency persistency ;
1169
1172
1170
1173
if (!s -> in_use )
1171
1174
continue ;
1172
1175
1173
1176
SpinLockAcquire (& s -> mutex );
1177
+ persistency = s -> data .persistency ;
1174
1178
restart_lsn = s -> data .restart_lsn ;
1175
1179
invalidated = s -> data .invalidated != RS_INVAL_NONE ;
1180
+ restart_lsn_flushed = s -> restart_lsn_flushed ;
1176
1181
SpinLockRelease (& s -> mutex );
1177
1182
1178
1183
/* invalidated slots need not apply */
1179
1184
if (invalidated )
1180
1185
continue ;
1181
1186
1187
+ /* Get the flushed restart_lsn for the persistent slot to compute
1188
+ * the oldest LSN for WAL segments removals.
1189
+ */
1190
+ if (persistency == RS_PERSISTENT )
1191
+ {
1192
+ if (restart_lsn_flushed != InvalidXLogRecPtr &&
1193
+ restart_lsn > restart_lsn_flushed )
1194
+ {
1195
+ restart_lsn = restart_lsn_flushed ;
1196
+ }
1197
+ }
1198
+
1182
1199
if (restart_lsn != InvalidXLogRecPtr &&
1183
1200
(min_required == InvalidXLogRecPtr ||
1184
1201
restart_lsn < min_required ))
@@ -1216,7 +1233,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
1216
1233
{
1217
1234
ReplicationSlot * s ;
1218
1235
XLogRecPtr restart_lsn ;
1236
+ XLogRecPtr restart_lsn_flushed ;
1219
1237
bool invalidated ;
1238
+ ReplicationSlotPersistency persistency ;
1220
1239
1221
1240
s = & ReplicationSlotCtl -> replication_slots [i ];
1222
1241
@@ -1230,14 +1249,28 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
1230
1249
1231
1250
/* read once, it's ok if it increases while we're checking */
1232
1251
SpinLockAcquire (& s -> mutex );
1252
+ persistency = s -> data .persistency ;
1233
1253
restart_lsn = s -> data .restart_lsn ;
1234
1254
invalidated = s -> data .invalidated != RS_INVAL_NONE ;
1255
+ restart_lsn_flushed = s -> restart_lsn_flushed ;
1235
1256
SpinLockRelease (& s -> mutex );
1236
1257
1237
1258
/* invalidated slots need not apply */
1238
1259
if (invalidated )
1239
1260
continue ;
1240
1261
1262
+ /* Get the flushed restart_lsn for the persistent slot to compute
1263
+ * the oldest LSN for WAL segments removals.
1264
+ */
1265
+ if (persistency == RS_PERSISTENT )
1266
+ {
1267
+ if (restart_lsn_flushed != InvalidXLogRecPtr &&
1268
+ restart_lsn > restart_lsn_flushed )
1269
+ {
1270
+ restart_lsn = restart_lsn_flushed ;
1271
+ }
1272
+ }
1273
+
1241
1274
if (restart_lsn == InvalidXLogRecPtr )
1242
1275
continue ;
1243
1276
@@ -1455,6 +1488,7 @@ ReplicationSlotReserveWal(void)
1455
1488
1456
1489
Assert (slot != NULL );
1457
1490
Assert (slot -> data .restart_lsn == InvalidXLogRecPtr );
1491
+ Assert (slot -> restart_lsn_flushed == InvalidXLogRecPtr );
1458
1492
1459
1493
/*
1460
1494
* The replication slot mechanism is used to prevent removal of required
@@ -1766,6 +1800,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1766
1800
*/
1767
1801
SpinLockAcquire (& s -> mutex );
1768
1802
1803
+ Assert (s -> data .restart_lsn >= s -> restart_lsn_flushed );
1804
+
1769
1805
restart_lsn = s -> data .restart_lsn ;
1770
1806
1771
1807
/* we do nothing if the slot is already invalid */
@@ -1835,7 +1871,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1835
1871
* just rely on .invalidated.
1836
1872
*/
1837
1873
if (invalidation_cause == RS_INVAL_WAL_REMOVED )
1874
+ {
1838
1875
s -> data .restart_lsn = InvalidXLogRecPtr ;
1876
+ s -> restart_lsn_flushed = InvalidXLogRecPtr ;
1877
+ }
1839
1878
1840
1879
/* Let caller know */
1841
1880
* invalidated = true;
@@ -2354,6 +2393,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2354
2393
if (!slot -> just_dirtied )
2355
2394
slot -> dirty = false;
2356
2395
slot -> last_saved_confirmed_flush = cp .slotdata .confirmed_flush ;
2396
+ slot -> restart_lsn_flushed = cp .slotdata .restart_lsn ;
2357
2397
SpinLockRelease (& slot -> mutex );
2358
2398
2359
2399
LWLockRelease (& slot -> io_in_progress_lock );
@@ -2569,6 +2609,7 @@ RestoreSlotFromDisk(const char *name)
2569
2609
slot -> effective_xmin = cp .slotdata .xmin ;
2570
2610
slot -> effective_catalog_xmin = cp .slotdata .catalog_xmin ;
2571
2611
slot -> last_saved_confirmed_flush = cp .slotdata .confirmed_flush ;
2612
+ slot -> restart_lsn_flushed = cp .slotdata .restart_lsn ;
2572
2613
2573
2614
slot -> candidate_catalog_xmin = InvalidTransactionId ;
2574
2615
slot -> candidate_xmin_lsn = InvalidXLogRecPtr ;
0 commit comments