Suppress useless wakeups in walreceiver.
authorThomas Munro <tmunro@postgresql.org>
Tue, 8 Nov 2022 07:36:36 +0000 (20:36 +1300)
committerThomas Munro <tmunro@postgresql.org>
Tue, 8 Nov 2022 07:36:36 +0000 (20:36 +1300)
Instead of waking up 10 times per second to check for various timeout
conditions, keep track of when we next have periodic work to do.

Author: Thomas Munro <thomas.munro@gmail.com>
Author: Nathan Bossart <nathandbossart@gmail.com>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/CA%2BhUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA%40mail.gmail.com

src/backend/replication/walreceiver.c
src/tools/pgindent/typedefs.list

index 6cbb67c92a30215607bdf84855f6970823a3b56d..8bd2ba37ddfce6ca8171908cf1771a9bc1208907 100644 (file)
@@ -95,8 +95,6 @@ bool          hot_standby_feedback;
 static WalReceiverConn *wrconn = NULL;
 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
 
-#define NAPTIME_PER_CYCLE 100  /* max sleep time between cycles (100ms) */
-
 /*
  * These variables are used similarly to openLogFile/SegNo,
  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -116,6 +114,23 @@ static struct
        XLogRecPtr      Flush;                  /* last byte + 1 flushed in the standby */
 }                      LogstreamResult;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum WalRcvWakeupReason
+{
+       WALRCV_WAKEUP_TERMINATE,
+       WALRCV_WAKEUP_PING,
+       WALRCV_WAKEUP_REPLY,
+       WALRCV_WAKEUP_HSFEEDBACK,
+       NUM_WALRCV_WAKEUPS
+} WalRcvWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
+
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
@@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
 
 /*
  * Process any interrupts the walreceiver process may have received.
@@ -179,9 +195,7 @@ WalReceiverMain(void)
        TimeLineID      primaryTLI;
        bool            first_stream;
        WalRcvData *walrcv = WalRcv;
-       TimestampTz last_recv_timestamp;
-       TimestampTz starttime;
-       bool            ping_sent;
+       TimestampTz now;
        char       *err;
        char       *sender_host = NULL;
        int                     sender_port = 0;
@@ -192,7 +206,7 @@ WalReceiverMain(void)
         */
        Assert(walrcv != NULL);
 
-       starttime = GetCurrentTimestamp();
+       now = GetCurrentTimestamp();
 
        /*
         * Mark walreceiver as running in shared memory.
@@ -248,7 +262,7 @@ WalReceiverMain(void)
 
        /* Initialise to a sanish value */
        walrcv->lastMsgSendTime =
-               walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime;
+               walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
 
        /* Report the latch to use to awaken this process */
        walrcv->latch = &MyProc->procLatch;
@@ -414,9 +428,10 @@ WalReceiverMain(void)
                        initStringInfo(&reply_message);
                        initStringInfo(&incoming_message);
 
-                       /* Initialize the last recv timestamp */
-                       last_recv_timestamp = GetCurrentTimestamp();
-                       ping_sent = false;
+                       /* Initialize nap wakeup times. */
+                       now = GetCurrentTimestamp();
+                       for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+                               WalRcvComputeNextWakeup(i, now);
 
                        /* Loop until end-of-streaming or error */
                        for (;;)
@@ -426,6 +441,8 @@ WalReceiverMain(void)
                                bool            endofwal = false;
                                pgsocket        wait_fd = PGINVALID_SOCKET;
                                int                     rc;
+                               TimestampTz nextWakeup;
+                               int                     nap;
 
                                /*
                                 * Exit walreceiver if we're not in recovery. This should not
@@ -443,11 +460,15 @@ WalReceiverMain(void)
                                {
                                        ConfigReloadPending = false;
                                        ProcessConfigFile(PGC_SIGHUP);
+                                       now = GetCurrentTimestamp();
+                                       for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+                                               WalRcvComputeNextWakeup(i, now);
                                        XLogWalRcvSendHSFeedback(true);
                                }
 
                                /* See if we can read data immediately */
                                len = walrcv_receive(wrconn, &buf, &wait_fd);
+                               now = GetCurrentTimestamp();
                                if (len != 0)
                                {
                                        /*
@@ -459,11 +480,12 @@ WalReceiverMain(void)
                                                if (len > 0)
                                                {
                                                        /*
-                                                        * Something was received from primary, so reset
-                                                        * timeout
+                                                        * Something was received from primary, so adjust
+                                                        * the ping and terminate wakeup times.
                                                         */
-                                                       last_recv_timestamp = GetCurrentTimestamp();
-                                                       ping_sent = false;
+                                                       WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
+                                                                                                       now);
+                                                       WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
                                                        XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
                                                                                                 startpointTLI);
                                                }
@@ -480,6 +502,7 @@ WalReceiverMain(void)
                                                        break;
                                                }
                                                len = walrcv_receive(wrconn, &buf, &wait_fd);
+                                               now = GetCurrentTimestamp();
                                        }
 
                                        /* Let the primary know that we received some data. */
@@ -497,6 +520,20 @@ WalReceiverMain(void)
                                if (endofwal)
                                        break;
 
+                               /* Find the soonest wakeup time, to limit our nap. */
+                               nextWakeup = PG_INT64_MAX;
+                               for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+                                       nextWakeup = Min(wakeup[i], nextWakeup);
+
+                               /*
+                                * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
+                                * timeouts longer than INT_MAX milliseconds, so we limit the
+                                * result accordingly.  Also, we round up to the next
+                                * millisecond to avoid waking up too early and spinning until
+                                * one of the wakeup times.
+                                */
+                               nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+
                                /*
                                 * Ideally we would reuse a WaitEventSet object repeatedly
                                 * here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -513,8 +550,9 @@ WalReceiverMain(void)
                                                                           WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
                                                                           WL_TIMEOUT | WL_LATCH_SET,
                                                                           wait_fd,
-                                                                          NAPTIME_PER_CYCLE,
+                                                                          nap,
                                                                           WAIT_EVENT_WAL_RECEIVER_MAIN);
+                               now = GetCurrentTimestamp();
                                if (rc & WL_LATCH_SET)
                                {
                                        ResetLatch(MyLatch);
@@ -550,34 +588,19 @@ WalReceiverMain(void)
                                         * Check if time since last receive from primary has
                                         * reached the configured limit.
                                         */
-                                       if (wal_receiver_timeout > 0)
-                                       {
-                                               TimestampTz now = GetCurrentTimestamp();
-                                               TimestampTz timeout;
-
-                                               timeout =
-                                                       TimestampTzPlusMilliseconds(last_recv_timestamp,
-                                                                                                               wal_receiver_timeout);
+                                       if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_CONNECTION_FAILURE),
+                                                                errmsg("terminating walreceiver due to timeout")));
 
-                                               if (now >= timeout)
-                                                       ereport(ERROR,
-                                                                       (errcode(ERRCODE_CONNECTION_FAILURE),
-                                                                        errmsg("terminating walreceiver due to timeout")));
-
-                                               /*
-                                                * We didn't receive anything new, for half of
-                                                * receiver replication timeout. Ping the server.
-                                                */
-                                               if (!ping_sent)
-                                               {
-                                                       timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-                                                                                                                                 (wal_receiver_timeout / 2));
-                                                       if (now >= timeout)
-                                                       {
-                                                               requestReply = true;
-                                                               ping_sent = true;
-                                                       }
-                                               }
+                                       /*
+                                        * We didn't receive anything new, for half of receiver
+                                        * replication timeout. Ping the server.
+                                        */
+                                       if (now >= wakeup[WALRCV_WAKEUP_PING])
+                                       {
+                                               requestReply = true;
+                                               wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
                                        }
 
                                        XLogWalRcvSendReply(requestReply, requestReply);
@@ -1076,7 +1099,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
        static XLogRecPtr writePtr = 0;
        static XLogRecPtr flushPtr = 0;
        XLogRecPtr      applyPtr;
-       static TimestampTz sendTime = 0;
        TimestampTz now;
 
        /*
@@ -1101,10 +1123,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
        if (!force
                && writePtr == LogstreamResult.Write
                && flushPtr == LogstreamResult.Flush
-               && !TimestampDifferenceExceeds(sendTime, now,
-                                                                          wal_receiver_status_interval * 1000))
+               && now < wakeup[WALRCV_WAKEUP_REPLY])
                return;
-       sendTime = now;
+
+       /* Make sure we wake up when it's time to send another reply. */
+       WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
 
        /* Construct a new message */
        writePtr = LogstreamResult.Write;
@@ -1149,7 +1172,6 @@ XLogWalRcvSendHSFeedback(bool immed)
                                catalog_xmin_epoch;
        TransactionId xmin,
                                catalog_xmin;
-       static TimestampTz sendTime = 0;
 
        /* initially true so we always send at least one feedback message */
        static bool primary_has_standby_xmin = true;
@@ -1165,16 +1187,12 @@ XLogWalRcvSendHSFeedback(bool immed)
        /* Get current timestamp. */
        now = GetCurrentTimestamp();
 
-       if (!immed)
-       {
-               /*
-                * Send feedback at most once per wal_receiver_status_interval.
-                */
-               if (!TimestampDifferenceExceeds(sendTime, now,
-                                                                               wal_receiver_status_interval * 1000))
-                       return;
-               sendTime = now;
-       }
+       /* Send feedback at most once per wal_receiver_status_interval. */
+       if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
+               return;
+
+       /* Make sure we wake up when it's time to send feedback again. */
+       WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
 
        /*
         * If Hot Standby is not yet accepting connections there is nothing to
@@ -1285,6 +1303,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
        }
 }
 
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
+{
+       switch (reason)
+       {
+               case WALRCV_WAKEUP_TERMINATE:
+                       if (wal_receiver_timeout <= 0)
+                               wakeup[reason] = PG_INT64_MAX;
+                       else
+                               wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+                       break;
+               case WALRCV_WAKEUP_PING:
+                       if (wal_receiver_timeout <= 0)
+                               wakeup[reason] = PG_INT64_MAX;
+                       else
+                               wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+                       break;
+               case WALRCV_WAKEUP_HSFEEDBACK:
+                       if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
+                               wakeup[reason] = PG_INT64_MAX;
+                       else
+                               wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+                       break;
+               case WALRCV_WAKEUP_REPLY:
+                       if (wal_receiver_status_interval <= 0)
+                               wakeup[reason] = PG_INT64_MAX;
+                       else
+                               wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+                       break;
+               default:
+                       break;
+       }
+}
+
 /*
  * Wake up the walreceiver main loop.
  *
index 9683b0a88e527667b950d8c11373da22a7d6ed89..245aea1dd14e584b7c1abef6df9579fcd7f0f7ae 100644 (file)
@@ -2927,6 +2927,7 @@ WALInsertLock
 WALInsertLockPadded
 WALOpenSegment
 WALReadError
+WalRcvWakeupReason
 WALSegmentCloseCB
 WALSegmentContext
 WALSegmentOpenCB