diff options
Diffstat (limited to 'src/backend/replication')
| -rw-r--r-- | src/backend/replication/walreceiver.c | 72 | ||||
| -rw-r--r-- | src/backend/replication/walsender.c | 115 |
2 files changed, 176 insertions, 11 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7005307dc25..30e35dbd28a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -54,6 +54,9 @@ /* Global variable to indicate if this process is a walreceiver process */ bool am_walreceiver; +/* GUC variable */ +int wal_receiver_status_interval; + /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; walrcv_receive_type walrcv_receive = NULL; @@ -88,6 +91,8 @@ static struct XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ } LogstreamResult; +static StandbyReplyMessage reply_message; + /* * About SIGTERM handling: * @@ -114,6 +119,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); +static void XLogWalRcvSendReply(void); /* Signal handlers */ static void WalRcvSigHupHandler(SIGNAL_ARGS); @@ -306,12 +312,23 @@ WalReceiverMain(void) while (walrcv_receive(0, &type, &buf, &len)) XLogWalRcvProcessMsg(type, buf, len); + /* Let the master know that we received some data. */ + XLogWalRcvSendReply(); + /* * If we've written some records, flush them to disk and let the * startup process know about them. */ XLogWalRcvFlush(); } + else + { + /* + * We didn't receive anything new, but send a status update to + * the master anyway, to report any progress in applying WAL. + */ + XLogWalRcvSendReply(); + } } } @@ -546,5 +563,60 @@ XLogWalRcvFlush(void) LogstreamResult.Write.xrecoff); set_ps_display(activitymsg, false); } + + /* Also let the master know that we made some progress */ + XLogWalRcvSendReply(); } } + +/* + * Send reply message to primary, indicating our current XLOG positions and + * the current time. + */ +static void +XLogWalRcvSendReply(void) +{ + char buf[sizeof(StandbyReplyMessage) + 1]; + TimestampTz now; + + /* + * If the user doesn't want status to be reported to the master, be sure + * to exit before doing anything at all. + */ + if (wal_receiver_status_interval <= 0) + return; + + /* Get current timestamp. */ + now = GetCurrentTimestamp(); + + /* + * We can compare the write and flush positions to the last message we + * sent without taking any lock, but the apply position requires a spin + * lock, so we don't check that unless something else has changed or 10 + * seconds have passed. This means that the apply log position will + * appear, from the master's point of view, to lag slightly, but since + * this is only for reporting purposes and only on idle systems, that's + * probably OK. + */ + if (XLByteEQ(reply_message.write, LogstreamResult.Write) + && XLByteEQ(reply_message.flush, LogstreamResult.Flush) + && !TimestampDifferenceExceeds(reply_message.sendTime, now, + wal_receiver_status_interval * 1000)) + return; + + /* Construct a new message. */ + reply_message.write = LogstreamResult.Write; + reply_message.flush = LogstreamResult.Flush; + reply_message.apply = GetXLogReplayRecPtr(); + reply_message.sendTime = now; + + elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", + reply_message.write.xlogid, reply_message.write.xrecoff, + reply_message.flush.xlogid, reply_message.flush.xrecoff, + reply_message.apply.xlogid, reply_message.apply.xrecoff); + + /* Prepend with the message type and send it. */ + buf[0] = 'r'; + memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage)); + walrcv_send(buf, sizeof(StandbyReplyMessage) + 1); +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 78963c1e6be..3ad95b495ec 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -39,6 +39,7 @@ #include "funcapi.h" #include "access/xlog_internal.h" +#include "access/transam.h" #include "catalog/pg_type.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -51,6 +52,7 @@ #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pmsignal.h" +#include "storage/proc.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -106,9 +108,10 @@ static void InitWalSnd(void); static void WalSndHandshake(void); static void WalSndKill(int code, Datum arg); static bool XLogSend(char *msgbuf, bool *caughtup); -static void CheckClosedConnection(void); static void IdentifySystem(void); static void StartReplication(StartReplicationCmd * cmd); +static void ProcessStandbyReplyMessage(void); +static void ProcessRepliesIfAny(void); /* Main entry point for walsender process */ @@ -442,7 +445,7 @@ HandleReplicationCommand(const char *cmd_string) * Check if the remote end has closed the connection. */ static void -CheckClosedConnection(void) +ProcessRepliesIfAny(void) { unsigned char firstchar; int r; @@ -466,6 +469,13 @@ CheckClosedConnection(void) switch (firstchar) { /* + * 'd' means a standby reply wrapped in a COPY BOTH packet. + */ + case 'd': + ProcessStandbyReplyMessage(); + break; + + /* * 'X' means that the standby is closing down the socket. */ case 'X': @@ -479,6 +489,62 @@ CheckClosedConnection(void) } } +/* + * Process a status update message received from standby. + */ +static void +ProcessStandbyReplyMessage(void) +{ + static StringInfoData input_message; + StandbyReplyMessage reply; + char msgtype; + + initStringInfo(&input_message); + + /* + * Read the message contents. + */ + if (pq_getmessage(&input_message, 0)) + { + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + proc_exit(0); + } + + /* + * Check message type from the first byte. At the moment, there is only + * one type. + */ + msgtype = pq_getmsgbyte(&input_message); + if (msgtype != 'r') + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected message type %c", msgtype))); + + pq_copymsgbytes(&input_message, (char *) &reply, sizeof(StandbyReplyMessage)); + + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X ", + reply.write.xlogid, reply.write.xrecoff, + reply.flush.xlogid, reply.flush.xrecoff, + reply.apply.xlogid, reply.apply.xrecoff); + + /* + * Update shared state for this WalSender process + * based on reply data from standby. + */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->write = reply.write; + walsnd->flush = reply.flush; + walsnd->apply = reply.apply; + SpinLockRelease(&walsnd->mutex); + } +} + /* Main loop of walsender process */ static int WalSndLoop(void) @@ -518,6 +584,7 @@ WalSndLoop(void) { if (!XLogSend(output_message, &caughtup)) break; + ProcessRepliesIfAny(); if (caughtup) walsender_shutdown_requested = true; } @@ -561,9 +628,6 @@ WalSndLoop(void) WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock, WalSndDelay * 1000L); } - - /* Check if the connection was closed */ - CheckClosedConnection(); } else { @@ -574,6 +638,7 @@ WalSndLoop(void) /* Update our state to indicate if we're behind or not */ WalSndSetState(caughtup ? WALSNDSTATE_STREAMING : WALSNDSTATE_CATCHUP); + ProcessRepliesIfAny(); } /* @@ -1104,7 +1169,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 3 +#define PG_STAT_GET_WAL_SENDERS_COLS 6 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -1141,8 +1206,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; - char sent_location[MAXFNAMELEN]; + char location[MAXFNAMELEN]; XLogRecPtr sentPtr; + XLogRecPtr write; + XLogRecPtr flush; + XLogRecPtr apply; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -1153,13 +1221,14 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockAcquire(&walsnd->mutex); sentPtr = walsnd->sentPtr; state = walsnd->state; + write = walsnd->write; + flush = walsnd->flush; + apply = walsnd->apply; SpinLockRelease(&walsnd->mutex); - snprintf(sent_location, sizeof(sent_location), "%X/%X", - sentPtr.xlogid, sentPtr.xrecoff); - memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(walsnd->pid); + if (!superuser()) { /* @@ -1168,11 +1237,35 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ nulls[1] = true; nulls[2] = true; + nulls[3] = true; + nulls[4] = true; + nulls[5] = true; } else { values[1] = CStringGetTextDatum(WalSndGetStateString(state)); - values[2] = CStringGetTextDatum(sent_location); + + snprintf(location, sizeof(location), "%X/%X", + sentPtr.xlogid, sentPtr.xrecoff); + values[2] = CStringGetTextDatum(location); + + if (write.xlogid == 0 && write.xrecoff == 0) + nulls[3] = true; + snprintf(location, sizeof(location), "%X/%X", + write.xlogid, write.xrecoff); + values[3] = CStringGetTextDatum(location); + + if (flush.xlogid == 0 && flush.xrecoff == 0) + nulls[4] = true; + snprintf(location, sizeof(location), "%X/%X", + flush.xlogid, flush.xrecoff); + values[4] = CStringGetTextDatum(location); + + if (apply.xlogid == 0 && apply.xrecoff == 0) + nulls[5] = true; + snprintf(location, sizeof(location), "%X/%X", + apply.xlogid, apply.xrecoff); + values[5] = CStringGetTextDatum(location); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); |
