Fix the review comments and a bug in the slot sync code.
authorAmit Kapila <akapila@postgresql.org>
Fri, 12 Apr 2024 09:33:28 +0000 (15:03 +0530)
committerAmit Kapila <akapila@postgresql.org>
Fri, 12 Apr 2024 09:40:41 +0000 (15:10 +0530)
Ensure that when updating the catalog_xmin of the synced slots, it is
first written to disk before changing the in-memory value
(effective_catalog_xmin). This is to prevent a scenario where the
in-memory value change triggers a vacuum to remove catalog tuples before
the catalog_xmin is written to disk. In the event of a crash before the
catalog_xmin is persisted, we would not know that some required catalog
tuples have been removed and the synced slot would be invalidated.

Change the sanity check to ensure that remote_slot's confirmed_flush LSN
can't precede the local/synced slot during slot sync. Note that the
restart_lsn of the synced/local slot can be ahead of remote_slot. This can
happen when slot advancing machinery finds a running xacts record after
reaching the consistent state at a later point than the primary where it
serializes the snapshot and updates the restart_lsn.

Make the check to sync slots robust by allowing to sync only when the
confirmed_lsn, restart_lsn, or catalog_xmin of the remote slot is ahead of
the synced/local slot.

Reported-by: Amit Kapila and Shveta Malik
Author: Hou Zhijie, Shveta Malik
Reviewed-by: Amit Kapila, Bertrand Drouvot
Discussion: https://postgr.es/m/OS0PR01MB57162B67D3CB01B2756FBA6D94062@OS0PR01MB5716.jpnprd01.prod.outlook.com
Discussion: https://postgr.es/m/CAJpy0uCSS5zmdyUXhvw41HSdTbRqX1hbYqkOfHNj7qQ+2zn0AQ@mail.gmail.com

src/backend/replication/logical/slotsync.c

index 97440cb6bf01b96e593d14c0867f79ee80066929..bda0de52db9b29ff7c72e8bec56cdadebf3321da 100644 (file)
@@ -162,22 +162,78 @@ static void update_synced_slots_inactive_since(void);
  * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
  * modified, and decoding from the corresponding LSN's can reach a
  * consistent snapshot.
+ *
+ * *remote_slot_precedes will be true if the remote slot's LSN or xmin
+ * precedes locally reserved position.
  */
 static bool
 update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
-                                                bool *found_consistent_snapshot)
+                                                bool *found_consistent_snapshot,
+                                                bool *remote_slot_precedes)
 {
        ReplicationSlot *slot = MyReplicationSlot;
-       bool            slot_updated = false;
+       bool            updated_xmin_or_lsn = false;
+       bool            updated_config = false;
 
        Assert(slot->data.invalidated == RS_INVAL_NONE);
 
        if (found_consistent_snapshot)
                *found_consistent_snapshot = false;
 
-       if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
-               remote_slot->restart_lsn != slot->data.restart_lsn ||
-               remote_slot->catalog_xmin != slot->data.catalog_xmin)
+       if (remote_slot_precedes)
+               *remote_slot_precedes = false;
+
+       /*
+        * Don't overwrite if we already have a newer catalog_xmin and
+        * restart_lsn.
+        */
+       if (remote_slot->restart_lsn < slot->data.restart_lsn ||
+               TransactionIdPrecedes(remote_slot->catalog_xmin,
+                                                         slot->data.catalog_xmin))
+       {
+               /*
+                * This can happen in following situations:
+                *
+                * If the slot is temporary, it means either the initial WAL location
+                * reserved for the local slot is ahead of the remote slot's
+                * restart_lsn or the initial xmin_horizon computed for the local slot
+                * is ahead of the remote slot.
+                *
+                * If the slot is persistent, restart_lsn of the synced slot could
+                * still be ahead of the remote slot. Since we use slot advance
+                * functionality to keep snapbuild/slot updated, it is possible that
+                * the restart_lsn is advanced to a later position than it has on the
+                * primary. This can happen when slot advancing machinery finds
+                * running xacts record after reaching the consistent state at a later
+                * point than the primary where it serializes the snapshot and updates
+                * the restart_lsn.
+                *
+                * We LOG the message if the slot is temporary as it can help the user
+                * to understand why the slot is not sync-ready. In the case of a
+                * persistent slot, it would be a more common case and won't directly
+                * impact the users, so we used DEBUG1 level to log the message.
+                */
+               ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
+                               errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
+                                          remote_slot->name),
+                               errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
+                                                 LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+                                                 remote_slot->catalog_xmin,
+                                                 LSN_FORMAT_ARGS(slot->data.restart_lsn),
+                                                 slot->data.catalog_xmin));
+
+               if (remote_slot_precedes)
+                       *remote_slot_precedes = true;
+       }
+
+       /*
+        * Attempt to sync LSNs and xmins only if remote slot is ahead of local
+        * slot.
+        */
+       else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
+                        remote_slot->restart_lsn > slot->data.restart_lsn ||
+                        TransactionIdFollows(remote_slot->catalog_xmin,
+                                                                 slot->data.catalog_xmin))
        {
                /*
                 * We can't directly copy the remote slot's LSN or xmin unless there
@@ -198,7 +254,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                        slot->data.restart_lsn = remote_slot->restart_lsn;
                        slot->data.confirmed_flush = remote_slot->confirmed_lsn;
                        slot->data.catalog_xmin = remote_slot->catalog_xmin;
-                       slot->effective_catalog_xmin = remote_slot->catalog_xmin;
                        SpinLockRelease(&slot->mutex);
 
                        if (found_consistent_snapshot)
@@ -208,12 +263,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                {
                        LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
                                                                                                found_consistent_snapshot);
-               }
 
-               ReplicationSlotsComputeRequiredXmin(false);
-               ReplicationSlotsComputeRequiredLSN();
+                       /* Sanity check */
+                       if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
+                               ereport(ERROR,
+                                               errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
+                                                                               remote_slot->name),
+                                               errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
+                                                                                  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+                                                                                  LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
+               }
 
-               slot_updated = true;
+               updated_xmin_or_lsn = true;
        }
 
        if (remote_dbid != slot->data.database ||
@@ -233,10 +294,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
                slot->data.failover = remote_slot->failover;
                SpinLockRelease(&slot->mutex);
 
-               slot_updated = true;
+               updated_config = true;
        }
 
-       return slot_updated;
+       /*
+        * We have to write the changed xmin to disk *before* we change the
+        * in-memory value, otherwise after a crash we wouldn't know that some
+        * catalog tuples might have been removed already.
+        */
+       if (updated_config || updated_xmin_or_lsn)
+       {
+               ReplicationSlotMarkDirty();
+               ReplicationSlotSave();
+       }
+
+       /*
+        * Now the new xmin is safely on disk, we can let the global value
+        * advance. We do not take ProcArrayLock or similar since we only advance
+        * xmin here and there's not much harm done by a concurrent computation
+        * missing that.
+        */
+       if (updated_xmin_or_lsn)
+       {
+               SpinLockAcquire(&slot->mutex);
+               slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+               SpinLockRelease(&slot->mutex);
+
+               ReplicationSlotsComputeRequiredXmin(false);
+               ReplicationSlotsComputeRequiredLSN();
+       }
+
+       return updated_config || updated_xmin_or_lsn;
 }
 
 /*
@@ -460,14 +548,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
        ReplicationSlot *slot = MyReplicationSlot;
        bool            found_consistent_snapshot = false;
+       bool            remote_slot_precedes = false;
+
+       (void) update_local_synced_slot(remote_slot, remote_dbid,
+                                                                       &found_consistent_snapshot,
+                                                                       &remote_slot_precedes);
 
        /*
         * Check if the primary server has caught up. Refer to the comment atop
         * the file for details on this check.
         */
-       if (remote_slot->restart_lsn < slot->data.restart_lsn ||
-               TransactionIdPrecedes(remote_slot->catalog_xmin,
-                                                         slot->data.catalog_xmin))
+       if (remote_slot_precedes)
        {
                /*
                 * The remote slot didn't catch up to locally reserved position.
@@ -476,23 +567,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
                 * current location when recreating the slot in the next cycle. It may
                 * take more time to create such a slot. Therefore, we keep this slot
                 * and attempt the synchronization in the next cycle.
-                *
-                * XXX should this be changed to elog(DEBUG1) perhaps?
                 */
-               ereport(LOG,
-                               errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
-                                          remote_slot->name),
-                               errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
-                                                 LSN_FORMAT_ARGS(remote_slot->restart_lsn),
-                                                 remote_slot->catalog_xmin,
-                                                 LSN_FORMAT_ARGS(slot->data.restart_lsn),
-                                                 slot->data.catalog_xmin));
                return false;
        }
 
-       (void) update_local_synced_slot(remote_slot, remote_dbid,
-                                                                       &found_consistent_snapshot);
-
        /*
         * Don't persist the slot if it cannot reach the consistent point from the
         * restart_lsn. See comments atop this file.
@@ -633,23 +711,20 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
                        /*
                         * Sanity check: As long as the invalidations are handled
                         * appropriately as above, this should never happen.
+                        *
+                        * We don't need to check restart_lsn here. See the comments in
+                        * update_local_synced_slot() for details.
                         */
-                       if (remote_slot->restart_lsn < slot->data.restart_lsn)
-                               elog(ERROR,
-                                        "cannot synchronize local slot \"%s\" LSN(%X/%X)"
-                                        " to remote slot's LSN(%X/%X) as synchronization"
-                                        " would move it backwards", remote_slot->name,
-                                        LSN_FORMAT_ARGS(slot->data.restart_lsn),
-                                        LSN_FORMAT_ARGS(remote_slot->restart_lsn));
-
-                       /* Make sure the slot changes persist across server restart */
-                       if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
-                       {
-                               ReplicationSlotMarkDirty();
-                               ReplicationSlotSave();
-
-                               slot_updated = true;
-                       }
+                       if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
+                               ereport(ERROR,
+                                               errmsg_internal("cannot synchronize local slot \"%s\"",
+                                                                               remote_slot->name),
+                                               errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
+                                                                                  LSN_FORMAT_ARGS(slot->data.confirmed_flush),
+                                                                                  LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
+
+                       slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
+                                                                                                       NULL, NULL);
                }
        }
        /* Otherwise create the slot first. */