Suppress useless wakeups in walreceiver.
authorThomas Munro <tmunro@postgresql.org>
Tue, 8 Nov 2022 07:36:36 +0000 (20:36 +1300)
committerThomas Munro <tmunro@postgresql.org>
Tue, 8 Nov 2022 07:36:36 +0000 (20:36 +1300)
Instead of waking up 10 times per second to check for various timeout
conditions, keep track of when we next have periodic work to do.

Author: Thomas Munro <thomas.munro@gmail.com>
Author: Nathan Bossart <nathandbossart@gmail.com>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/CA%2BhUKGJGhX4r2LPUE3Oy9BX71Eum6PBcS8L3sJpScR9oKaTVaA%40mail.gmail.com

src/backend/replication/walreceiver.c
src/tools/pgindent/typedefs.list

index 6cbb67c92a30215607bdf84855f6970823a3b56d..8bd2ba37ddfce6ca8171908cf1771a9bc1208907 100644 (file)
@@ -95,8 +95,6 @@ bool      hot_standby_feedback;
 static WalReceiverConn *wrconn = NULL;
 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
 
-#define NAPTIME_PER_CYCLE 100  /* max sleep time between cycles (100ms) */
-
 /*
  * These variables are used similarly to openLogFile/SegNo,
  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -116,6 +114,23 @@ static struct
    XLogRecPtr  Flush;          /* last byte + 1 flushed in the standby */
 }          LogstreamResult;
 
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum WalRcvWakeupReason
+{
+   WALRCV_WAKEUP_TERMINATE,
+   WALRCV_WAKEUP_PING,
+   WALRCV_WAKEUP_REPLY,
+   WALRCV_WAKEUP_HSFEEDBACK,
+   NUM_WALRCV_WAKEUPS
+} WalRcvWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
+
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
@@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
 
 /*
  * Process any interrupts the walreceiver process may have received.
@@ -179,9 +195,7 @@ WalReceiverMain(void)
    TimeLineID  primaryTLI;
    bool        first_stream;
    WalRcvData *walrcv = WalRcv;
-   TimestampTz last_recv_timestamp;
-   TimestampTz starttime;
-   bool        ping_sent;
+   TimestampTz now;
    char       *err;
    char       *sender_host = NULL;
    int         sender_port = 0;
@@ -192,7 +206,7 @@ WalReceiverMain(void)
     */
    Assert(walrcv != NULL);
 
-   starttime = GetCurrentTimestamp();
+   now = GetCurrentTimestamp();
 
    /*
     * Mark walreceiver as running in shared memory.
@@ -248,7 +262,7 @@ WalReceiverMain(void)
 
    /* Initialise to a sanish value */
    walrcv->lastMsgSendTime =
-       walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime;
+       walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
 
    /* Report the latch to use to awaken this process */
    walrcv->latch = &MyProc->procLatch;
@@ -414,9 +428,10 @@ WalReceiverMain(void)
            initStringInfo(&reply_message);
            initStringInfo(&incoming_message);
 
-           /* Initialize the last recv timestamp */
-           last_recv_timestamp = GetCurrentTimestamp();
-           ping_sent = false;
+           /* Initialize nap wakeup times. */
+           now = GetCurrentTimestamp();
+           for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+               WalRcvComputeNextWakeup(i, now);
 
            /* Loop until end-of-streaming or error */
            for (;;)
@@ -426,6 +441,8 @@ WalReceiverMain(void)
                bool        endofwal = false;
                pgsocket    wait_fd = PGINVALID_SOCKET;
                int         rc;
+               TimestampTz nextWakeup;
+               int         nap;
 
                /*
                 * Exit walreceiver if we're not in recovery. This should not
@@ -443,11 +460,15 @@ WalReceiverMain(void)
                {
                    ConfigReloadPending = false;
                    ProcessConfigFile(PGC_SIGHUP);
+                   now = GetCurrentTimestamp();
+                   for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+                       WalRcvComputeNextWakeup(i, now);
                    XLogWalRcvSendHSFeedback(true);
                }
 
                /* See if we can read data immediately */
                len = walrcv_receive(wrconn, &buf, &wait_fd);
+               now = GetCurrentTimestamp();
                if (len != 0)
                {
                    /*
@@ -459,11 +480,12 @@ WalReceiverMain(void)
                        if (len > 0)
                        {
                            /*
-                            * Something was received from primary, so reset
-                            * timeout
+                            * Something was received from primary, so adjust
+                            * the ping and terminate wakeup times.
                             */
-                           last_recv_timestamp = GetCurrentTimestamp();
-                           ping_sent = false;
+                           WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
+                                                   now);
+                           WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
                            XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
                                                 startpointTLI);
                        }
@@ -480,6 +502,7 @@ WalReceiverMain(void)
                            break;
                        }
                        len = walrcv_receive(wrconn, &buf, &wait_fd);
+                       now = GetCurrentTimestamp();
                    }
 
                    /* Let the primary know that we received some data. */
@@ -497,6 +520,20 @@ WalReceiverMain(void)
                if (endofwal)
                    break;
 
+               /* Find the soonest wakeup time, to limit our nap. */
+               nextWakeup = PG_INT64_MAX;
+               for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+                   nextWakeup = Min(wakeup[i], nextWakeup);
+
+               /*
+                * Calculate the nap time.  WaitLatchOrSocket() doesn't accept
+                * timeouts longer than INT_MAX milliseconds, so we limit the
+                * result accordingly.  Also, we round up to the next
+                * millisecond to avoid waking up too early and spinning until
+                * one of the wakeup times.
+                */
+               nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+
                /*
                 * Ideally we would reuse a WaitEventSet object repeatedly
                 * here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -513,8 +550,9 @@ WalReceiverMain(void)
                                       WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
                                       WL_TIMEOUT | WL_LATCH_SET,
                                       wait_fd,
-                                      NAPTIME_PER_CYCLE,
+                                      nap,
                                       WAIT_EVENT_WAL_RECEIVER_MAIN);
+               now = GetCurrentTimestamp();
                if (rc & WL_LATCH_SET)
                {
                    ResetLatch(MyLatch);
@@ -550,34 +588,19 @@ WalReceiverMain(void)
                     * Check if time since last receive from primary has
                     * reached the configured limit.
                     */
-                   if (wal_receiver_timeout > 0)
-                   {
-                       TimestampTz now = GetCurrentTimestamp();
-                       TimestampTz timeout;
-
-                       timeout =
-                           TimestampTzPlusMilliseconds(last_recv_timestamp,
-                                                       wal_receiver_timeout);
+                   if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
+                       ereport(ERROR,
+                               (errcode(ERRCODE_CONNECTION_FAILURE),
+                                errmsg("terminating walreceiver due to timeout")));
 
-                       if (now >= timeout)
-                           ereport(ERROR,
-                                   (errcode(ERRCODE_CONNECTION_FAILURE),
-                                    errmsg("terminating walreceiver due to timeout")));
-
-                       /*
-                        * We didn't receive anything new, for half of
-                        * receiver replication timeout. Ping the server.
-                        */
-                       if (!ping_sent)
-                       {
-                           timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-                                                                 (wal_receiver_timeout / 2));
-                           if (now >= timeout)
-                           {
-                               requestReply = true;
-                               ping_sent = true;
-                           }
-                       }
+                   /*
+                    * We didn't receive anything new, for half of receiver
+                    * replication timeout. Ping the server.
+                    */
+                   if (now >= wakeup[WALRCV_WAKEUP_PING])
+                   {
+                       requestReply = true;
+                       wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
                    }
 
                    XLogWalRcvSendReply(requestReply, requestReply);
@@ -1076,7 +1099,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
    static XLogRecPtr writePtr = 0;
    static XLogRecPtr flushPtr = 0;
    XLogRecPtr  applyPtr;
-   static TimestampTz sendTime = 0;
    TimestampTz now;
 
    /*
@@ -1101,10 +1123,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
    if (!force
        && writePtr == LogstreamResult.Write
        && flushPtr == LogstreamResult.Flush
-       && !TimestampDifferenceExceeds(sendTime, now,
-                                      wal_receiver_status_interval * 1000))
+       && now < wakeup[WALRCV_WAKEUP_REPLY])
        return;
-   sendTime = now;
+
+   /* Make sure we wake up when it's time to send another reply. */
+   WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
 
    /* Construct a new message */
    writePtr = LogstreamResult.Write;
@@ -1149,7 +1172,6 @@ XLogWalRcvSendHSFeedback(bool immed)
                catalog_xmin_epoch;
    TransactionId xmin,
                catalog_xmin;
-   static TimestampTz sendTime = 0;
 
    /* initially true so we always send at least one feedback message */
    static bool primary_has_standby_xmin = true;
@@ -1165,16 +1187,12 @@ XLogWalRcvSendHSFeedback(bool immed)
    /* Get current timestamp. */
    now = GetCurrentTimestamp();
 
-   if (!immed)
-   {
-       /*
-        * Send feedback at most once per wal_receiver_status_interval.
-        */
-       if (!TimestampDifferenceExceeds(sendTime, now,
-                                       wal_receiver_status_interval * 1000))
-           return;
-       sendTime = now;
-   }
+   /* Send feedback at most once per wal_receiver_status_interval. */
+   if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
+       return;
+
+   /* Make sure we wake up when it's time to send feedback again. */
+   WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
 
    /*
     * If Hot Standby is not yet accepting connections there is nothing to
@@ -1285,6 +1303,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
    }
 }
 
+/*
+ * Compute the next wakeup time for a given wakeup reason.  Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
+{
+   switch (reason)
+   {
+       case WALRCV_WAKEUP_TERMINATE:
+           if (wal_receiver_timeout <= 0)
+               wakeup[reason] = PG_INT64_MAX;
+           else
+               wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+           break;
+       case WALRCV_WAKEUP_PING:
+           if (wal_receiver_timeout <= 0)
+               wakeup[reason] = PG_INT64_MAX;
+           else
+               wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+           break;
+       case WALRCV_WAKEUP_HSFEEDBACK:
+           if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
+               wakeup[reason] = PG_INT64_MAX;
+           else
+               wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+           break;
+       case WALRCV_WAKEUP_REPLY:
+           if (wal_receiver_status_interval <= 0)
+               wakeup[reason] = PG_INT64_MAX;
+           else
+               wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+           break;
+       default:
+           break;
+   }
+}
+
 /*
  * Wake up the walreceiver main loop.
  *
index 9683b0a88e527667b950d8c11373da22a7d6ed89..245aea1dd14e584b7c1abef6df9579fcd7f0f7ae 100644 (file)
@@ -2927,6 +2927,7 @@ WALInsertLock
 WALInsertLockPadded
 WALOpenSegment
 WALReadError
+WalRcvWakeupReason
 WALSegmentCloseCB
 WALSegmentContext
 WALSegmentOpenCB