static WalReceiverConn *wrconn = NULL;
WalReceiverFunctionsType *WalReceiverFunctions = NULL;
-#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
-
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum WalRcvWakeupReason
+{
+ WALRCV_WAKEUP_TERMINATE,
+ WALRCV_WAKEUP_PING,
+ WALRCV_WAKEUP_REPLY,
+ WALRCV_WAKEUP_HSFEEDBACK,
+ NUM_WALRCV_WAKEUPS
+} WalRcvWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
+
static StringInfoData reply_message;
static StringInfoData incoming_message;
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
/*
* Process any interrupts the walreceiver process may have received.
TimeLineID primaryTLI;
bool first_stream;
WalRcvData *walrcv = WalRcv;
- TimestampTz last_recv_timestamp;
- TimestampTz starttime;
- bool ping_sent;
+ TimestampTz now;
char *err;
char *sender_host = NULL;
int sender_port = 0;
*/
Assert(walrcv != NULL);
- starttime = GetCurrentTimestamp();
+ now = GetCurrentTimestamp();
/*
* Mark walreceiver as running in shared memory.
/* Initialise to a sanish value */
walrcv->lastMsgSendTime =
- walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime;
+ walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
/* Report the latch to use to awaken this process */
walrcv->latch = &MyProc->procLatch;
initStringInfo(&reply_message);
initStringInfo(&incoming_message);
- /* Initialize the last recv timestamp */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ /* Initialize nap wakeup times. */
+ now = GetCurrentTimestamp();
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ WalRcvComputeNextWakeup(i, now);
/* Loop until end-of-streaming or error */
for (;;)
bool endofwal = false;
pgsocket wait_fd = PGINVALID_SOCKET;
int rc;
+ TimestampTz nextWakeup;
+ int nap;
/*
* Exit walreceiver if we're not in recovery. This should not
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+ now = GetCurrentTimestamp();
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ WalRcvComputeNextWakeup(i, now);
XLogWalRcvSendHSFeedback(true);
}
/* See if we can read data immediately */
len = walrcv_receive(wrconn, &buf, &wait_fd);
+ now = GetCurrentTimestamp();
if (len != 0)
{
/*
if (len > 0)
{
/*
- * Something was received from primary, so reset
- * timeout
+ * Something was received from primary, so adjust
+ * the ping and terminate wakeup times.
*/
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
+ now);
+ WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
startpointTLI);
}
break;
}
len = walrcv_receive(wrconn, &buf, &wait_fd);
+ now = GetCurrentTimestamp();
}
/* Let the primary know that we received some data. */
if (endofwal)
break;
+ /* Find the soonest wakeup time, to limit our nap. */
+ nextWakeup = PG_INT64_MAX;
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ nextWakeup = Min(wakeup[i], nextWakeup);
+
+ /*
+ * Calculate the nap time. WaitLatchOrSocket() doesn't accept
+ * timeouts longer than INT_MAX milliseconds, so we limit the
+ * result accordingly. Also, we round up to the next
+ * millisecond to avoid waking up too early and spinning until
+ * one of the wakeup times.
+ */
+ nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+
/*
* Ideally we would reuse a WaitEventSet object repeatedly
* here to avoid the overheads of WaitLatchOrSocket on epoll
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET,
wait_fd,
- NAPTIME_PER_CYCLE,
+ nap,
WAIT_EVENT_WAL_RECEIVER_MAIN);
+ now = GetCurrentTimestamp();
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
* Check if time since last receive from primary has
* reached the configured limit.
*/
- if (wal_receiver_timeout > 0)
- {
- TimestampTz now = GetCurrentTimestamp();
- TimestampTz timeout;
-
- timeout =
- TimestampTzPlusMilliseconds(last_recv_timestamp,
- wal_receiver_timeout);
+ if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating walreceiver due to timeout")));
- if (now >= timeout)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("terminating walreceiver due to timeout")));
-
- /*
- * We didn't receive anything new, for half of
- * receiver replication timeout. Ping the server.
- */
- if (!ping_sent)
- {
- timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- (wal_receiver_timeout / 2));
- if (now >= timeout)
- {
- requestReply = true;
- ping_sent = true;
- }
- }
+ /*
+ * We didn't receive anything new, for half of receiver
+ * replication timeout. Ping the server.
+ */
+ if (now >= wakeup[WALRCV_WAKEUP_PING])
+ {
+ requestReply = true;
+ wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
}
XLogWalRcvSendReply(requestReply, requestReply);
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
- static TimestampTz sendTime = 0;
TimestampTz now;
/*
if (!force
&& writePtr == LogstreamResult.Write
&& flushPtr == LogstreamResult.Flush
- && !TimestampDifferenceExceeds(sendTime, now,
- wal_receiver_status_interval * 1000))
+ && now < wakeup[WALRCV_WAKEUP_REPLY])
return;
- sendTime = now;
+
+ /* Make sure we wake up when it's time to send another reply. */
+ WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
/* Construct a new message */
writePtr = LogstreamResult.Write;
catalog_xmin_epoch;
TransactionId xmin,
catalog_xmin;
- static TimestampTz sendTime = 0;
/* initially true so we always send at least one feedback message */
static bool primary_has_standby_xmin = true;
/* Get current timestamp. */
now = GetCurrentTimestamp();
- if (!immed)
- {
- /*
- * Send feedback at most once per wal_receiver_status_interval.
- */
- if (!TimestampDifferenceExceeds(sendTime, now,
- wal_receiver_status_interval * 1000))
- return;
- sendTime = now;
- }
+ /* Send feedback at most once per wal_receiver_status_interval. */
+ if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
+ return;
+
+ /* Make sure we wake up when it's time to send feedback again. */
+ WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
/*
* If Hot Standby is not yet accepting connections there is nothing to
}
}
+/*
+ * Compute the next wakeup time for a given wakeup reason. Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
+{
+ switch (reason)
+ {
+ case WALRCV_WAKEUP_TERMINATE:
+ if (wal_receiver_timeout <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+ break;
+ case WALRCV_WAKEUP_PING:
+ if (wal_receiver_timeout <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+ break;
+ case WALRCV_WAKEUP_HSFEEDBACK:
+ if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+ break;
+ case WALRCV_WAKEUP_REPLY:
+ if (wal_receiver_status_interval <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+ break;
+ default:
+ break;
+ }
+}
+
/*
* Wake up the walreceiver main loop.
*