For cascading replication, wake physical and logical walsenders separately
authorAndres Freund <andres@anarazel.de>
Sat, 8 Apr 2023 07:24:24 +0000 (00:24 -0700)
committerAndres Freund <andres@anarazel.de>
Sat, 8 Apr 2023 08:06:00 +0000 (01:06 -0700)
Physical walsenders can't send data until it's been flushed; logical
walsenders can't decode and send data until it's been applied. On the
standby, the WAL is flushed first, which will only wake up physical
walsenders; and then applied, which will only wake up logical
walsenders.

Previously, all walsenders were awakened when the WAL was flushed. That
was fine for logical walsenders on the primary; but on the standby the
flushed WAL would have been not applied yet, so logical walsenders were
awakened too early.

Per idea from Jeff Davis and Amit Kapila.

Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-By: Jeff Davis <pgsql@j-davis.com>
Reviewed-By: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1+zO5LUeisabX10c81LU-fWMKO4M9Wyg1cdkbW7Hqh6vQ@mail.gmail.com

src/backend/access/transam/xlog.c
src/backend/access/transam/xlogarchive.c
src/backend/access/transam/xlogrecovery.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/include/replication/walsender.h
src/include/replication/walsender_private.h

index 13f83dd57d6c6b8fc3c1f75fa6262ca8b06bb8b2..1b7c2f23a41b092a0c28ecd00dab295c5a0d8c70 100644 (file)
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
        END_CRIT_SECTION();
 
        /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
        /*
         * If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
        END_CRIT_SECTION();
 
        /* wake up walsenders now that we've released heavily contended locks */
-       WalSndWakeupProcessRequests();
+       WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
        /*
         * Great, done. To take some work off the critical path, try to initialize
@@ -5765,7 +5765,7 @@ StartupXLOG(void)
         * If there were cascading standby servers connected to us, nudge any wal
         * sender processes to notice that we've been promoted.
         */
-       WalSndWakeup();
+       WalSndWakeup(true, true);
 
        /*
         * If this was a promotion, request an (online) checkpoint now. This isn't
index a0f5aa24b58a9dbefb52e40944969eae915812f8..f3fb92c8f96f97450eea849d714b3119ead290d2 100644 (file)
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
         * if we restored something other than a WAL segment, but it does no harm
         * either.
         */
-       WalSndWakeup();
+       WalSndWakeup(true, false);
 }
 
 /*
index dbe9394762768e1523e4f993020e9b05c56c9805..02d1b2cd6d86ddd5ffd484b7344168cfc6e15bdd 100644 (file)
@@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
        XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
        SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
+       /* ------
+        * Wakeup walsenders:
+        *
+        * On the standby, the WAL is flushed first (which will only wake up
+        * physical walsenders) and then applied, which will only wake up logical
+        * walsenders.
+        *
+        * Indeed, logical walsenders on standby can't decode and send data until
+        * it's been applied.
+        *
+        * Physical walsenders don't need to be woken up during replay unless
+        * cascading replication is allowed and time line change occurred (so that
+        * they can notice that they are on a new time line).
+        *
+        * That's why the wake up conditions are for:
+        *
+        *  - physical walsenders in case of new time line and cascade
+        *    replication is allowed
+        *  - logical walsenders in case cascade replication is allowed (could not
+        *    be created otherwise)
+        * ------
+        */
+       if (AllowCascadeReplication())
+               WalSndWakeup(switchedTLI, true);
+
        /*
         * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
         * receiver so that it notices the updated lastReplayedEndRecPtr and sends
@@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
                 */
                RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
 
-               /*
-                * Wake up any walsenders to notice that we are on a new timeline.
-                */
-               if (AllowCascadeReplication())
-                       WalSndWakeup();
-
                /* Reset the prefetcher. */
                XLogPrefetchReconfigure();
        }
@@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
                {
                        /*
                         * When we find that WAL ends in an incomplete record, keep track
-                        * of that record.  After recovery is done, we'll write a record to
-                        * indicate to downstream WAL readers that that portion is to be
-                        * ignored.
+                        * of that record.  After recovery is done, we'll write a record
+                        * to indicate to downstream WAL readers that that portion is to
+                        * be ignored.
                         *
                         * However, when ArchiveRecoveryRequested = true, we're going to
                         * switch to a new timeline at the end of recovery. We will only
index 685af51d5d3ec58f1bf14426944f84f071daea10..feff709435113411c8bdf7586d85ac42eec7ca1f 100644 (file)
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
                /* Signal the startup process and walsender that new WAL has arrived */
                WakeupRecovery();
                if (AllowCascadeReplication())
-                       WalSndWakeup();
+                       WalSndWakeup(true, false);
 
                /* Report XLOG streaming progress in PS display */
                if (update_process_title)
index e40a9b1ba7b1a4d7d0ac94aa85d9b3d8693c1264..5423cf0a171d2710d1493661650d914292e0e37d 100644 (file)
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
                        walsnd->sync_standby_priority = 0;
                        walsnd->latch = &MyProc->procLatch;
                        walsnd->replyTime = 0;
+
+                       /*
+                        * The kind assignment is done here and not in StartReplication()
+                        * and StartLogicalReplication(). Indeed, the logical walsender
+                        * needs to read WAL records (like snapshot of running
+                        * transactions) during the slot creation. So it needs to be woken
+                        * up based on its kind.
+                        *
+                        * The kind assignment could also be done in StartReplication(),
+                        * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+                        * seems better to set it on one place.
+                        */
+                       if (MyDatabaseId == InvalidOid)
+                               walsnd->kind = REPLICATION_KIND_PHYSICAL;
+                       else
+                               walsnd->kind = REPLICATION_KIND_LOGICAL;
+
                        SpinLockRelease(&walsnd->mutex);
                        /* don't need the lock anymore */
                        MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
 }
 
 /*
- * Wake up all walsenders
+ * Wake up physical, logical or both kinds of walsenders
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ *   applied
+ *
+ * For cascading replication we need to wake up physical walsenders separately
+ * from logical walsenders (see the comment before calling WalSndWakeup() in
+ * ApplyWalRecord() for more details).
  *
  * This will be called inside critical sections, so throwing an error is not
  * advisable.
  */
 void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
 {
        int                     i;
 
        for (i = 0; i < max_wal_senders; i++)
        {
                Latch      *latch;
+               ReplicationKind kind;
                WalSnd     *walsnd = &WalSndCtl->walsnds[i];
 
                /*
                 * Get latch pointer with spinlock held, for the unlikely case that
-                * pointer reads aren't atomic (as they're 8 bytes).
+                * pointer reads aren't atomic (as they're 8 bytes). While at it, also
+                * get kind.
                 */
                SpinLockAcquire(&walsnd->mutex);
                latch = walsnd->latch;
+               kind = walsnd->kind;
                SpinLockRelease(&walsnd->mutex);
 
-               if (latch != NULL)
+               if (latch == NULL)
+                       continue;
+
+               if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+                       (logical && kind == REPLICATION_KIND_LOGICAL))
                        SetLatch(latch);
        }
 }
index 52bb3e2aae30f9a60aa8d7b5096004338f914f30..9df7e50f9430544539020af58b9acdd0dbcd6859 100644 (file)
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
 extern void WalSndInitStopping(void);
 extern void WalSndWaitStopping(void);
 extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
 /*
  * wakeup walsenders if there is work to be done
  */
-#define WalSndWakeupProcessRequests()          \
-       do                                                                              \
-       {                                                                               \
-               if (wake_wal_senders)                           \
-               {                                                                       \
-                       wake_wal_senders = false;               \
-                       if (max_wal_senders > 0)                \
-                               WalSndWakeup();                         \
-               }                                                                       \
-       } while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+       if (wake_wal_senders)
+       {
+               wake_wal_senders = false;
+               if (max_wal_senders > 0)
+                       WalSndWakeup(physical, logical);
+       }
+}
 
 #endif                                                 /* _WALSENDER_H */
index 5310e054c488777f12ea9d3e60967401d8584f93..ff25aa70a8939f8b5cb559f5387e4d9819dcd74c 100644 (file)
@@ -15,6 +15,7 @@
 #include "access/xlog.h"
 #include "lib/ilist.h"
 #include "nodes/nodes.h"
+#include "nodes/replnodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
 #include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
         * Timestamp of the last message received from standby.
         */
        TimestampTz replyTime;
+
+       ReplicationKind kind;
 } WalSnd;
 
 extern PGDLLIMPORT WalSnd *MyWalSnd;