Prevent use of invalidated logical slot in CreateDecodingContext()
authorAndres Freund <andres@anarazel.de>
Sat, 8 Apr 2023 05:19:05 +0000 (22:19 -0700)
committerAndres Freund <andres@anarazel.de>
Sat, 8 Apr 2023 05:19:05 +0000 (22:19 -0700)
Previously we had checks for this in multiple places. Support for logical
decoding on standbys will add other forms of invalidation, making it worth
while to centralize the checks.

This slightly changes the error message for both the walsender and SQL
interface. Particularly the SQL interface error was inaccurate, as the "This
slot has never previously reserved WAL" portion was unreachable.

Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de

src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/walsender.c

index c3ec97a0a623c6c65f03130f45eecc1b78200bf5..6082d222d5d4d0ca51d2bceb6f43aabbd63d410e 100644 (file)
@@ -518,6 +518,22 @@ CreateDecodingContext(XLogRecPtr start_lsn,
                                 errmsg("replication slot \"%s\" was not created in this database",
                                                NameStr(slot->data.name))));
 
+       /*
+        * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
+        * "cannot get changes" wording in this errmsg because that'd be
+        * confusingly ambiguous about no changes being available when called from
+        * pg_logical_slot_get_changes_guts().
+        */
+       if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("can no longer get changes from replication slot \"%s\"",
+                                               NameStr(MyReplicationSlot->data.name)),
+                                errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
+
+       Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
+       Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
+
        if (start_lsn == InvalidXLogRecPtr)
        {
                /* continue from last position */
index fa1b641a2b02ef05436b0cbc05fb537e76ba5095..55a24c02c9480b5d4a4ea7e0964245e803ae9753 100644 (file)
@@ -214,19 +214,6 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
                                                                        LogicalOutputPrepareWrite,
                                                                        LogicalOutputWrite, NULL);
 
-               /*
-                * After the sanity checks in CreateDecodingContext, make sure the
-                * restart_lsn is valid.  Avoid "cannot get changes" wording in this
-                * errmsg because that'd be confusingly ambiguous about no changes
-                * being available.
-                */
-               if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                        errmsg("can no longer get changes from replication slot \"%s\"",
-                                                       NameStr(*name)),
-                                        errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
-
                MemoryContextSwitchTo(oldcontext);
 
                /*
index 75e8363e24812577b99e6918249fe91dcb64eea1..e40a9b1ba7b1a4d7d0ac94aa85d9b3d8693c1264 100644 (file)
@@ -1253,13 +1253,6 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
        ReplicationSlotAcquire(cmd->slotname, true);
 
-       if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
-               ereport(ERROR,
-                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                errmsg("cannot read from logical replication slot \"%s\"",
-                                               cmd->slotname),
-                                errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
        /*
         * Force a disconnect, so that the decoding code doesn't need to care
         * about an eventual switch from running in recovery, to running in a