summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/walreceiver.c72
-rw-r--r--src/backend/replication/walsender.c115
2 files changed, 176 insertions, 11 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7005307dc25..30e35dbd28a 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -54,6 +54,9 @@
/* Global variable to indicate if this process is a walreceiver process */
bool am_walreceiver;
+/* GUC variable */
+int wal_receiver_status_interval;
+
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
@@ -88,6 +91,8 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
+static StandbyReplyMessage reply_message;
+
/*
* About SIGTERM handling:
*
@@ -114,6 +119,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
+static void XLogWalRcvSendReply(void);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -306,12 +312,23 @@ WalReceiverMain(void)
while (walrcv_receive(0, &type, &buf, &len))
XLogWalRcvProcessMsg(type, buf, len);
+ /* Let the master know that we received some data. */
+ XLogWalRcvSendReply();
+
/*
* If we've written some records, flush them to disk and let the
* startup process know about them.
*/
XLogWalRcvFlush();
}
+ else
+ {
+ /*
+ * We didn't receive anything new, but send a status update to
+ * the master anyway, to report any progress in applying WAL.
+ */
+ XLogWalRcvSendReply();
+ }
}
}
@@ -546,5 +563,60 @@ XLogWalRcvFlush(void)
LogstreamResult.Write.xrecoff);
set_ps_display(activitymsg, false);
}
+
+ /* Also let the master know that we made some progress */
+ XLogWalRcvSendReply();
}
}
+
+/*
+ * Send reply message to primary, indicating our current XLOG positions and
+ * the current time.
+ */
+static void
+XLogWalRcvSendReply(void)
+{
+ char buf[sizeof(StandbyReplyMessage) + 1];
+ TimestampTz now;
+
+ /*
+ * If the user doesn't want status to be reported to the master, be sure
+ * to exit before doing anything at all.
+ */
+ if (wal_receiver_status_interval <= 0)
+ return;
+
+ /* Get current timestamp. */
+ now = GetCurrentTimestamp();
+
+ /*
+ * We can compare the write and flush positions to the last message we
+ * sent without taking any lock, but the apply position requires a spin
+ * lock, so we don't check that unless something else has changed or 10
+ * seconds have passed. This means that the apply log position will
+ * appear, from the master's point of view, to lag slightly, but since
+ * this is only for reporting purposes and only on idle systems, that's
+ * probably OK.
+ */
+ if (XLByteEQ(reply_message.write, LogstreamResult.Write)
+ && XLByteEQ(reply_message.flush, LogstreamResult.Flush)
+ && !TimestampDifferenceExceeds(reply_message.sendTime, now,
+ wal_receiver_status_interval * 1000))
+ return;
+
+ /* Construct a new message. */
+ reply_message.write = LogstreamResult.Write;
+ reply_message.flush = LogstreamResult.Flush;
+ reply_message.apply = GetXLogReplayRecPtr();
+ reply_message.sendTime = now;
+
+ elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
+ reply_message.write.xlogid, reply_message.write.xrecoff,
+ reply_message.flush.xlogid, reply_message.flush.xrecoff,
+ reply_message.apply.xlogid, reply_message.apply.xrecoff);
+
+ /* Prepend with the message type and send it. */
+ buf[0] = 'r';
+ memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
+ walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 78963c1e6be..3ad95b495ec 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -39,6 +39,7 @@
#include "funcapi.h"
#include "access/xlog_internal.h"
+#include "access/transam.h"
#include "catalog/pg_type.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -51,6 +52,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc.h"
@@ -106,9 +108,10 @@ static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static bool XLogSend(char *msgbuf, bool *caughtup);
-static void CheckClosedConnection(void);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
+static void ProcessStandbyReplyMessage(void);
+static void ProcessRepliesIfAny(void);
/* Main entry point for walsender process */
@@ -442,7 +445,7 @@ HandleReplicationCommand(const char *cmd_string)
* Check if the remote end has closed the connection.
*/
static void
-CheckClosedConnection(void)
+ProcessRepliesIfAny(void)
{
unsigned char firstchar;
int r;
@@ -466,6 +469,13 @@ CheckClosedConnection(void)
switch (firstchar)
{
/*
+ * 'd' means a standby reply wrapped in a COPY BOTH packet.
+ */
+ case 'd':
+ ProcessStandbyReplyMessage();
+ break;
+
+ /*
* 'X' means that the standby is closing down the socket.
*/
case 'X':
@@ -479,6 +489,62 @@ CheckClosedConnection(void)
}
}
+/*
+ * Process a status update message received from standby.
+ */
+static void
+ProcessStandbyReplyMessage(void)
+{
+ static StringInfoData input_message;
+ StandbyReplyMessage reply;
+ char msgtype;
+
+ initStringInfo(&input_message);
+
+ /*
+ * Read the message contents.
+ */
+ if (pq_getmessage(&input_message, 0))
+ {
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+
+ /*
+ * Check message type from the first byte. At the moment, there is only
+ * one type.
+ */
+ msgtype = pq_getmsgbyte(&input_message);
+ if (msgtype != 'r')
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected message type %c", msgtype)));
+
+ pq_copymsgbytes(&input_message, (char *) &reply, sizeof(StandbyReplyMessage));
+
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ",
+ reply.write.xlogid, reply.write.xrecoff,
+ reply.flush.xlogid, reply.flush.xrecoff,
+ reply.apply.xlogid, reply.apply.xrecoff);
+
+ /*
+ * Update shared state for this WalSender process
+ * based on reply data from standby.
+ */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->write = reply.write;
+ walsnd->flush = reply.flush;
+ walsnd->apply = reply.apply;
+ SpinLockRelease(&walsnd->mutex);
+ }
+}
+
/* Main loop of walsender process */
static int
WalSndLoop(void)
@@ -518,6 +584,7 @@ WalSndLoop(void)
{
if (!XLogSend(output_message, &caughtup))
break;
+ ProcessRepliesIfAny();
if (caughtup)
walsender_shutdown_requested = true;
}
@@ -561,9 +628,6 @@ WalSndLoop(void)
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
WalSndDelay * 1000L);
}
-
- /* Check if the connection was closed */
- CheckClosedConnection();
}
else
{
@@ -574,6 +638,7 @@ WalSndLoop(void)
/* Update our state to indicate if we're behind or not */
WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP);
+ ProcessRepliesIfAny();
}
/*
@@ -1104,7 +1169,7 @@ WalSndGetStateString(WalSndState state)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 3
+#define PG_STAT_GET_WAL_SENDERS_COLS 6
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -1141,8 +1206,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
- char sent_location[MAXFNAMELEN];
+ char location[MAXFNAMELEN];
XLogRecPtr sentPtr;
+ XLogRecPtr write;
+ XLogRecPtr flush;
+ XLogRecPtr apply;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -1153,13 +1221,14 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
SpinLockAcquire(&walsnd->mutex);
sentPtr = walsnd->sentPtr;
state = walsnd->state;
+ write = walsnd->write;
+ flush = walsnd->flush;
+ apply = walsnd->apply;
SpinLockRelease(&walsnd->mutex);
- snprintf(sent_location, sizeof(sent_location), "%X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
-
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(walsnd->pid);
+
if (!superuser())
{
/*
@@ -1168,11 +1237,35 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*/
nulls[1] = true;
nulls[2] = true;
+ nulls[3] = true;
+ nulls[4] = true;
+ nulls[5] = true;
}
else
{
values[1] = CStringGetTextDatum(WalSndGetStateString(state));
- values[2] = CStringGetTextDatum(sent_location);
+
+ snprintf(location, sizeof(location), "%X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff);
+ values[2] = CStringGetTextDatum(location);
+
+ if (write.xlogid == 0 && write.xrecoff == 0)
+ nulls[3] = true;
+ snprintf(location, sizeof(location), "%X/%X",
+ write.xlogid, write.xrecoff);
+ values[3] = CStringGetTextDatum(location);
+
+ if (flush.xlogid == 0 && flush.xrecoff == 0)
+ nulls[4] = true;
+ snprintf(location, sizeof(location), "%X/%X",
+ flush.xlogid, flush.xrecoff);
+ values[4] = CStringGetTextDatum(location);
+
+ if (apply.xlogid == 0 && apply.xrecoff == 0)
+ nulls[5] = true;
+ snprintf(location, sizeof(location), "%X/%X",
+ apply.xlogid, apply.xrecoff);
+ values[5] = CStringGetTextDatum(location);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);