For cascading replication, wake physical and logical walsenders separately
authorAndres Freund <andres@anarazel.de>
Sat, 8 Apr 2023 07:24:24 +0000 (00:24 -0700)
committerAndres Freund <andres@anarazel.de>
Sat, 8 Apr 2023 08:06:00 +0000 (01:06 -0700)
Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.

Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.

Per idea from Jeff Davis and Amit Kapila.

Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-By: Jeff Davis <pgsql@j-davis.com>
Reviewed-By: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1+zO5LUeisabX10c81LU-fWMKO4M9Wyg1cdkbW7Hqh6vQ@mail.gmail.com

src/backend/access/transam/xlog.c
src/backend/access/transam/xlogarchive.c
src/backend/access/transam/xlogrecovery.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/replication/walsender.h
src/include/replication/walsender_private.h

index 13f83dd57d6c6b8fc3c1f75fa6262ca8b06bb8b2..1b7c2f23a41b092a0c28ecd00dab295c5a0d8c70 100644 (file)
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
    END_CRIT_SECTION();
 
    /* wake up walsenders now that we've released heavily contended locks */
-   WalSndWakeupProcessRequests();
+   WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
    /*
     * If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
    END_CRIT_SECTION();
 
    /* wake up walsenders now that we've released heavily contended locks */
-   WalSndWakeupProcessRequests();
+   WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
    /*
     * Great, done. To take some work off the critical path, try to initialize
@@ -5765,7 +5765,7 @@ StartupXLOG(void)
     * If there were cascading standby servers connected to us, nudge any wal
     * sender processes to notice that we've been promoted.
     */
-   WalSndWakeup();
+   WalSndWakeup(true, true);
 
    /*
     * If this was a promotion, request an (online) checkpoint now. This isn't
index a0f5aa24b58a9dbefb52e40944969eae915812f8..f3fb92c8f96f97450eea849d714b3119ead290d2 100644 (file)
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
     * if we restored something other than a WAL segment, but it does no harm
     * either.
     */
-   WalSndWakeup();
+   WalSndWakeup(true, false);
 }
 
 /*
index dbe9394762768e1523e4f993020e9b05c56c9805..02d1b2cd6d86ddd5ffd484b7344168cfc6e15bdd 100644 (file)
@@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
    XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
    SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+   /* ------
+    * Wakeup walsenders:
+    *
+    * On the standby, the WAL is flushed first (which will only wake up
+    * physical walsenders) and then applied, which will only wake up logical
+    * walsenders.
+    *
+    * Indeed, logical walsenders on standby can't decode and send data until
+    * it's been applied.
+    *
+    * Physical walsenders don't need to be woken up during replay unless
+    * cascading replication is allowed and time line change occurred (so that
+    * they can notice that they are on a new time line).
+    *
+    * That's why the wake up conditions are for:
+    *
+    *  - physical walsenders in case of new time line and cascade
+    *    replication is allowed
+    *  - logical walsenders in case cascade replication is allowed (could not
+    *    be created otherwise)
+    * ------
+    */
+   if (AllowCascadeReplication())
+       WalSndWakeup(switchedTLI, true);
+
    /*
     * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
     * receiver so that it notices the updated lastReplayedEndRecPtr and sends
@@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
         */
        RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
 
-       /*
-        * Wake up any walsenders to notice that we are on a new timeline.
-        */
-       if (AllowCascadeReplication())
-           WalSndWakeup();
-
        /* Reset the prefetcher. */
        XLogPrefetchReconfigure();
    }
@@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
        {
            /*
             * When we find that WAL ends in an incomplete record, keep track
-            * of that record.  After recovery is done, we'll write a record to
-            * indicate to downstream WAL readers that that portion is to be
-            * ignored.
+            * of that record.  After recovery is done, we'll write a record
+            * to indicate to downstream WAL readers that that portion is to
+            * be ignored.
             *
             * However, when ArchiveRecoveryRequested = true, we're going to
             * switch to a new timeline at the end of recovery. We will only
index 685af51d5d3ec58f1bf14426944f84f071daea10..feff709435113411c8bdf7586d85ac42eec7ca1f 100644 (file)
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
        /* Signal the startup process and walsender that new WAL has arrived */
        WakeupRecovery();
        if (AllowCascadeReplication())
-           WalSndWakeup();
+           WalSndWakeup(true, false);
 
        /* Report XLOG streaming progress in PS display */
        if (update_process_title)
index e40a9b1ba7b1a4d7d0ac94aa85d9b3d8693c1264..5423cf0a171d2710d1493661650d914292e0e37d 100644 (file)
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
            walsnd->sync_standby_priority = 0;
            walsnd->latch = &MyProc->procLatch;
            walsnd->replyTime = 0;
+
+           /*
+            * The kind assignment is done here and not in StartReplication()
+            * and StartLogicalReplication(). Indeed, the logical walsender
+            * needs to read WAL records (like snapshot of running
+            * transactions) during the slot creation. So it needs to be woken
+            * up based on its kind.
+            *
+            * The kind assignment could also be done in StartReplication(),
+            * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+            * seems better to set it on one place.
+            */
+           if (MyDatabaseId == InvalidOid)
+               walsnd->kind = REPLICATION_KIND_PHYSICAL;
+           else
+               walsnd->kind = REPLICATION_KIND_LOGICAL;
+
            SpinLockRelease(&walsnd->mutex);
            /* don't need the lock anymore */
            MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
 }
 
 /*
- * Wake up all walsenders
+ * Wake up physical, logical or both kinds of walsenders
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ *   applied
+ *
+ * For cascading replication we need to wake up physical walsenders separately
+ * from logical walsenders (see the comment before calling WalSndWakeup() in
+ * ApplyWalRecord() for more details).
  *
  * This will be called inside critical sections, so throwing an error is not
  * advisable.
  */
 void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
 {
    int         i;
 
    for (i = 0; i < max_wal_senders; i++)
    {
        Latch      *latch;
+       ReplicationKind kind;
        WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
        /*
         * Get latch pointer with spinlock held, for the unlikely case that
-        * pointer reads aren't atomic (as they're 8 bytes).
+        * pointer reads aren't atomic (as they're 8 bytes). While at it, also
+        * get kind.
         */
        SpinLockAcquire(&walsnd->mutex);
        latch = walsnd->latch;
+       kind = walsnd->kind;
        SpinLockRelease(&walsnd->mutex);
 
-       if (latch != NULL)
+       if (latch == NULL)
+           continue;
+
+       if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+           (logical && kind == REPLICATION_KIND_LOGICAL))
            SetLatch(latch);
    }
 }
index 52bb3e2aae30f9a60aa8d7b5096004338f914f30..9df7e50f9430544539020af58b9acdd0dbcd6859 100644 (file)
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
 extern void WalSndInitStopping(void);
 extern void WalSndWaitStopping(void);
 extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
 /*
  * wakeup walsenders if there is work to be done
  */
-#define WalSndWakeupProcessRequests()      \
-   do                                      \
-   {                                       \
-       if (wake_wal_senders)               \
-       {                                   \
-           wake_wal_senders = false;       \
-           if (max_wal_senders > 0)        \
-               WalSndWakeup();             \
-       }                                   \
-   } while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+   if (wake_wal_senders)
+   {
+       wake_wal_senders = false;
+       if (max_wal_senders > 0)
+           WalSndWakeup(physical, logical);
+   }
+}
 
 #endif                         /* _WALSENDER_H */
index 5310e054c488777f12ea9d3e60967401d8584f93..ff25aa70a8939f8b5cb559f5387e4d9819dcd74c 100644 (file)
@@ -15,6 +15,7 @@
 #include "access/xlog.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
+#include "nodes/replnodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
     * Timestamp of the last message received from standby.
     */
    TimestampTz replyTime;
+
+   ReplicationKind kind;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;